retrieve database

This commit is contained in:
Jonny Dixon 2025-06-11 12:12:45 +01:00
parent 12ca57c818
commit 21c4903e55
4 changed files with 272 additions and 51 deletions

View File

@ -92,7 +92,17 @@ class FivetranSource(StatefulIngestionSourceBase):
connector.connector_id, PlatformDetail() connector.connector_id, PlatformDetail()
) )
# Map connector type to known platform if needed # Apply source platform details from additional_properties if available (from log API enhancement)
if "source_platform" in connector.additional_properties:
source_details.platform = connector.additional_properties["source_platform"]
if "source_platform_instance" in connector.additional_properties:
source_details.platform_instance = connector.additional_properties[
"source_platform_instance"
]
if "source_env" in connector.additional_properties:
source_details.env = connector.additional_properties["source_env"]
# Map connector type to known platform if needed (only if not already set)
if source_details.platform is None: if source_details.platform is None:
connector_type = connector.connector_type.lower() connector_type = connector.connector_type.lower()
if connector_type in FIVETRAN_PLATFORM_TO_DATAHUB_PLATFORM: if connector_type in FIVETRAN_PLATFORM_TO_DATAHUB_PLATFORM:
@ -104,6 +114,15 @@ class FivetranSource(StatefulIngestionSourceBase):
# Auto-detect source database if not present in config # Auto-detect source database if not present in config
if source_details.database is None: if source_details.database is None:
# Check if source database is available in additional_properties first (from log API enhancement)
if "source_database" in connector.additional_properties:
source_details.database = connector.additional_properties[
"source_database"
]
logger.info(
f"Using source database '{source_details.database}' from additional properties for connector {connector.connector_id}"
)
else:
# Try to extract source database from connector's additional properties # Try to extract source database from connector's additional properties
detected_db = self._extract_source_database_from_connector(connector) detected_db = self._extract_source_database_from_connector(connector)
if detected_db: if detected_db:
@ -209,6 +228,14 @@ class FivetranSource(StatefulIngestionSourceBase):
connector.destination_id, PlatformDetail() connector.destination_id, PlatformDetail()
) )
# Apply destination platform details from additional_properties if available (from log API enhancement)
if "destination_platform_instance" in connector.additional_properties:
destination_details.platform_instance = connector.additional_properties[
"destination_platform_instance"
]
if "destination_env" in connector.additional_properties:
destination_details.env = connector.additional_properties["destination_env"]
# Set platform if not present # Set platform if not present
if destination_details.platform is None: if destination_details.platform is None:
# First check if there's a destination platform in additional properties # First check if there's a destination platform in additional properties
@ -417,24 +444,23 @@ class FivetranSource(StatefulIngestionSourceBase):
# Include database in the table name if available # Include database in the table name if available
database = details.database if details.database else "" database = details.database if details.database else ""
# Special handling for BigQuery # Normalize all components to lowercase for URN consistency
if platform.lower() == "bigquery": # This ensures proper lineage matching across DataHub
# For BigQuery, ensure lowercase database and table name
database = database.lower() if database else "" database = database.lower() if database else ""
# If include_schema_in_urn=False, table_name won't have the schema part
# Split table_name and normalize each part
if "." in table_name: if "." in table_name:
schema, table = table_name.split(".", 1) # Handle schema.table format
table_name = f"{schema.lower()}.{table.lower()}" parts = table_name.split(".")
table_name = ".".join(part.lower() for part in parts)
else: else:
table_name = table_name.lower() table_name = table_name.lower()
# For BigQuery, the database is the project ID and should be included # Build the full table name
full_table_name = f"{database}.{table_name}" if database else table_name full_table_name = f"{database}.{table_name}" if database else table_name
logger.debug(f"BigQuery dataset URN table name: {full_table_name}") logger.debug(
else: f"Normalized dataset URN table name: {full_table_name} (platform: {platform})"
# For other platforms, follow standard behavior )
full_table_name = f"{database}.{table_name}" if database else table_name
logger.debug(f"Standard dataset URN table name: {full_table_name}")
# Ensure environment is set # Ensure environment is set
env = details.env or "PROD" env = details.env or "PROD"
@ -488,10 +514,6 @@ class FivetranSource(StatefulIngestionSourceBase):
logger.info(f"Creating column lineage from {source_urn} to {dest_urn}") logger.info(f"Creating column lineage from {source_urn} to {dest_urn}")
# Extract destination platform from the URN
dest_platform = str(dest_urn).split(",")[0].split(":")[-1]
is_bigquery = dest_platform.lower() == "bigquery"
if not lineage.column_lineage: if not lineage.column_lineage:
logger.warning( logger.warning(
f"No column lineage data available for {lineage.source_table} -> {lineage.destination_table}" f"No column lineage data available for {lineage.source_table} -> {lineage.destination_table}"
@ -527,18 +549,16 @@ class FivetranSource(StatefulIngestionSourceBase):
# Process valid column mappings # Process valid column mappings
for column_lineage in valid_lineage: for column_lineage in valid_lineage:
try: try:
# Create field URNs # Normalize column names to lowercase for consistency
source_column = column_lineage.source_column.lower()
dest_column = column_lineage.destination_column.lower()
# Create field URNs with normalized column names
source_field_urn = builder.make_schema_field_urn( source_field_urn = builder.make_schema_field_urn(
str(source_urn), str(source_urn),
column_lineage.source_column, source_column,
) )
# For BigQuery, ensure proper case and format
dest_column = column_lineage.destination_column
if is_bigquery:
# Ensure it's lowercase for BigQuery
dest_column = dest_column.lower()
dest_field_urn = builder.make_schema_field_urn( dest_field_urn = builder.make_schema_field_urn(
str(dest_urn), str(dest_urn),
dest_column, dest_column,

View File

@ -61,7 +61,7 @@ class FivetranLogAPI(FivetranAccessInterface):
dialect=self.fivetran_log_config.destination_platform, pretty=True dialect=self.fivetran_log_config.destination_platform, pretty=True
) )
self.engine.execute(test_query) self.engine.execute(test_query) # type: ignore[attr-defined]
logger.info("Successfully tested database connection") logger.info("Successfully tested database connection")
return True return True
except SQLAlchemyError as e: except SQLAlchemyError as e:
@ -89,7 +89,7 @@ class FivetranLogAPI(FivetranAccessInterface):
snowflake_destination_config.get_sql_alchemy_url(), snowflake_destination_config.get_sql_alchemy_url(),
**snowflake_destination_config.get_options(), **snowflake_destination_config.get_options(),
) )
engine.execute( engine.execute( # type: ignore[attr-defined]
fivetran_log_query.use_database( fivetran_log_query.use_database(
snowflake_destination_config.database, snowflake_destination_config.database,
) )
@ -109,7 +109,7 @@ class FivetranLogAPI(FivetranAccessInterface):
fivetran_log_query.set_schema(bigquery_destination_config.dataset) fivetran_log_query.set_schema(bigquery_destination_config.dataset)
# The "database" should be the BigQuery project name. # The "database" should be the BigQuery project name.
fivetran_log_database = engine.execute( fivetran_log_database = engine.execute( # type: ignore[attr-defined]
"SELECT @@project_id" "SELECT @@project_id"
).fetchone()[0] ).fetchone()[0]
else: else:
@ -129,7 +129,7 @@ class FivetranLogAPI(FivetranAccessInterface):
dialect=self.fivetran_log_config.destination_platform, pretty=True dialect=self.fivetran_log_config.destination_platform, pretty=True
) )
logger.info(f"Executing query: {query}") logger.info(f"Executing query: {query}")
resp = self.engine.execute(query) resp = self.engine.execute(query) # type: ignore[attr-defined]
return [row for row in resp] return [row for row in resp]
def _get_column_lineage_metadata( def _get_column_lineage_metadata(
@ -341,6 +341,20 @@ class FivetranLogAPI(FivetranAccessInterface):
) )
continue continue
# Prepare additional properties for database detection
additional_properties = {}
# Set destination platform based on configuration
if self.fivetran_log_config.destination_platform:
additional_properties["destination_platform"] = (
self.fivetran_log_config.destination_platform
)
# Set destination database based on configuration
destination_database = self._get_destination_database_from_config()
if destination_database:
additional_properties["destination_database"] = destination_database # type: ignore[assignment]
connectors.append( connectors.append(
Connector( Connector(
connector_id=connector_id, connector_id=connector_id,
@ -352,6 +366,7 @@ class FivetranLogAPI(FivetranAccessInterface):
user_id=connector[Constant.CONNECTING_USER_ID], user_id=connector[Constant.CONNECTING_USER_ID],
lineage=[], # filled later lineage=[], # filled later
jobs=[], # filled later jobs=[], # filled later
additional_properties=additional_properties,
) )
) )
@ -365,7 +380,165 @@ class FivetranLogAPI(FivetranAccessInterface):
with report.metadata_extraction_perf.connectors_lineage_extraction_sec: with report.metadata_extraction_perf.connectors_lineage_extraction_sec:
logger.info("Fetching connector lineage") logger.info("Fetching connector lineage")
self._fill_connectors_lineage(connectors) self._fill_connectors_lineage(connectors)
# After lineage is filled, try to extract source database information
self._enhance_connectors_with_database_info(connectors)
with report.metadata_extraction_perf.connectors_jobs_extraction_sec: with report.metadata_extraction_perf.connectors_jobs_extraction_sec:
logger.info("Fetching connector job run history") logger.info("Fetching connector job run history")
self._fill_connectors_jobs(connectors, syncs_interval) self._fill_connectors_jobs(connectors, syncs_interval)
return connectors return connectors
def _get_destination_database_from_config(self) -> Optional[str]:
"""Extract destination database name from log configuration."""
if self.fivetran_log_config.destination_platform == "snowflake":
if self.fivetran_log_config.snowflake_destination_config:
return self.fivetran_log_config.snowflake_destination_config.database
elif self.fivetran_log_config.destination_platform == "bigquery":
if self.fivetran_log_config.bigquery_destination_config:
return self.fivetran_log_config.bigquery_destination_config.dataset
return None
def _enhance_connectors_with_database_info(
self, connectors: List[Connector]
) -> None:
"""
Enhance connectors with database and platform information from configuration mappings and lineage data.
This method:
1. Applies source platform/instance/database mappings from sources_to_platform_instance
2. Applies destination platform/instance/database mappings from destination_to_platform_instance
3. Extracts source database from lineage when not available from config
"""
for connector in connectors:
# Apply source platform instance mappings if configured
if (
self.config
and hasattr(self.config, "sources_to_platform_instance")
and connector.connector_id in self.config.sources_to_platform_instance
):
source_details = self.config.sources_to_platform_instance[
connector.connector_id
]
logger.info(
f"Applying source platform mapping for connector {connector.connector_id}: {source_details}"
)
# Apply source platform details to additional_properties
if source_details.platform:
connector.additional_properties["source_platform"] = (
source_details.platform
)
if source_details.platform_instance:
connector.additional_properties["source_platform_instance"] = (
source_details.platform_instance
)
if source_details.database:
connector.additional_properties["source_database"] = (
source_details.database
)
if source_details.env:
connector.additional_properties["source_env"] = source_details.env
# Apply destination platform instance mappings if configured
if (
self.config
and hasattr(self.config, "destination_to_platform_instance")
and connector.destination_id
in self.config.destination_to_platform_instance
):
dest_details = self.config.destination_to_platform_instance[
connector.destination_id
]
logger.info(
f"Applying destination platform mapping for destination {connector.destination_id}: {dest_details}"
)
# Apply destination platform details to additional_properties (may override config-based values)
if dest_details.platform:
connector.additional_properties["destination_platform"] = (
dest_details.platform
)
if dest_details.platform_instance:
connector.additional_properties["destination_platform_instance"] = (
dest_details.platform_instance
)
if dest_details.database:
connector.additional_properties["destination_database"] = (
dest_details.database
)
if dest_details.env:
connector.additional_properties["destination_env"] = (
dest_details.env
)
# Try to extract source database from lineage if not already set from mappings
if "source_database" not in connector.additional_properties:
source_database = self._extract_source_database_from_lineage(connector)
if source_database:
connector.additional_properties["source_database"] = source_database
logger.info(
f"Extracted source database '{source_database}' from lineage for connector {connector.connector_id}"
)
def _extract_source_database_from_lineage(
self, connector: Connector
) -> Optional[str]:
"""
Extract source database name from connector lineage data.
This method analyzes the source table names in lineage to infer
the source database name when it's not explicitly available.
"""
if not connector.lineage:
return None
# Collect potential database names from source tables
database_candidates = set()
for lineage_entry in connector.lineage[
:10
]: # Check first 10 entries for consistency
if lineage_entry.source_table and "." in lineage_entry.source_table:
# Parse source table format: potentially database.schema.table or schema.table
parts = lineage_entry.source_table.split(".")
if len(parts) >= 3:
# database.schema.table format - first part is database
database_candidates.add(parts[0])
elif len(parts) == 2:
# schema.table format - might be database.table for some connectors
# Only consider as database if it doesn't look like a typical schema name
potential_db = parts[0]
excluded_schemas = {
"public",
"dbo",
"information_schema",
"sys",
"default",
"main",
"schema",
}
if (
potential_db.lower() not in excluded_schemas
and len(potential_db) > 2
):
database_candidates.add(potential_db)
# If we have exactly one consistent database name, use it
if len(database_candidates) == 1:
database_name = next(iter(database_candidates))
logger.debug(
f"Inferred source database '{database_name}' from lineage consistency for connector {connector.connector_id}"
)
return database_name
elif len(database_candidates) > 1:
logger.debug(
f"Multiple database candidates found for connector {connector.connector_id}: {database_candidates}. "
"Unable to determine single source database."
)
else:
logger.debug(
f"No database information could be inferred from lineage for connector {connector.connector_id}"
)
return None

View File

@ -7,11 +7,15 @@
"aspect": { "aspect": {
"json": { "json": {
"customProperties": { "customProperties": {
"destination_platform": "snowflake",
"destination_database": "test_database",
"source_platform": "postgres",
"source_database": "postgres_db",
"source_env": "DEV",
"connector_type": "postgres", "connector_type": "postgres",
"sync_frequency": "1440", "sync_frequency": "1440",
"paused": "False", "paused": "False",
"destination_id": "interval_unconstitutional", "destination_id": "interval_unconstitutional"
"destination_platform": "snowflake"
}, },
"name": "postgres", "name": "postgres",
"description": "Fivetran connector for postgres to snowflake", "description": "Fivetran connector for postgres to snowflake",
@ -114,7 +118,10 @@
"destination.platform": "snowflake", "destination.platform": "snowflake",
"destination.env": "PROD", "destination.env": "PROD",
"destination.database": "test_database", "destination.database": "test_database",
"destination.column_naming_pattern": "auto" "destination.column_naming_pattern": "auto",
"connector.source_platform": "postgres",
"connector.source_database": "postgres_db",
"connector.source_env": "DEV"
}, },
"name": "postgres", "name": "postgres",
"description": "Fivetran data pipeline from postgres to snowflake", "description": "Fivetran data pipeline from postgres to snowflake",
@ -652,11 +659,15 @@
"aspect": { "aspect": {
"json": { "json": {
"customProperties": { "customProperties": {
"destination_platform": "snowflake",
"destination_database": "test_database",
"source_platform": "kafka",
"source_database": "kafka_prod",
"source_env": "PROD",
"connector_type": "confluent_cloud", "connector_type": "confluent_cloud",
"sync_frequency": "1440", "sync_frequency": "1440",
"paused": "False", "paused": "False",
"destination_id": "interval_unconstitutional", "destination_id": "interval_unconstitutional"
"destination_platform": "snowflake"
}, },
"name": "confluent_cloud", "name": "confluent_cloud",
"description": "Fivetran connector for confluent_cloud to snowflake", "description": "Fivetran connector for confluent_cloud to snowflake",
@ -759,7 +770,10 @@
"destination.platform": "snowflake", "destination.platform": "snowflake",
"destination.env": "PROD", "destination.env": "PROD",
"destination.database": "test_database", "destination.database": "test_database",
"destination.column_naming_pattern": "auto" "destination.column_naming_pattern": "auto",
"connector.source_platform": "kafka",
"connector.source_database": "kafka_prod",
"connector.source_env": "PROD"
}, },
"name": "confluent_cloud", "name": "confluent_cloud",
"description": "Fivetran data pipeline from confluent_cloud to snowflake", "description": "Fivetran data pipeline from confluent_cloud to snowflake",

View File

@ -7,11 +7,14 @@
"aspect": { "aspect": {
"json": { "json": {
"customProperties": { "customProperties": {
"destination_platform": "snowflake",
"destination_database": "test_database",
"source_database": "postgres_db",
"source_env": "DEV",
"connector_type": "postgres", "connector_type": "postgres",
"sync_frequency": "1440", "sync_frequency": "1440",
"paused": "False", "paused": "False",
"destination_id": "interval_unconstitutional", "destination_id": "interval_unconstitutional"
"destination_platform": "snowflake"
}, },
"name": "postgres", "name": "postgres",
"description": "Fivetran connector for postgres to snowflake", "description": "Fivetran connector for postgres to snowflake",
@ -97,7 +100,9 @@
"destination.platform": "snowflake", "destination.platform": "snowflake",
"destination.env": "PROD", "destination.env": "PROD",
"destination.database": "test_database", "destination.database": "test_database",
"destination.column_naming_pattern": "auto" "destination.column_naming_pattern": "auto",
"connector.source_database": "postgres_db",
"connector.source_env": "DEV"
}, },
"name": "postgres", "name": "postgres",
"description": "Fivetran data pipeline from postgres to snowflake", "description": "Fivetran data pipeline from postgres to snowflake",
@ -643,11 +648,16 @@
"aspect": { "aspect": {
"json": { "json": {
"customProperties": { "customProperties": {
"destination_platform": "snowflake",
"destination_database": "kafka_prod",
"source_platform": "kafka",
"source_database": "kafka_prod",
"source_env": "PROD",
"destination_env": "PROD",
"connector_type": "confluent_cloud", "connector_type": "confluent_cloud",
"sync_frequency": "1440", "sync_frequency": "1440",
"paused": "False", "paused": "False",
"destination_id": "my_confluent_cloud_connector_id", "destination_id": "my_confluent_cloud_connector_id"
"destination_platform": "snowflake"
}, },
"name": "confluent_cloud", "name": "confluent_cloud",
"description": "Fivetran connector for confluent_cloud to snowflake", "description": "Fivetran connector for confluent_cloud to snowflake",
@ -733,7 +743,11 @@
"destination.platform": "kafka", "destination.platform": "kafka",
"destination.env": "PROD", "destination.env": "PROD",
"destination.database": "kafka_prod", "destination.database": "kafka_prod",
"destination.column_naming_pattern": "auto" "destination.column_naming_pattern": "auto",
"connector.source_platform": "kafka",
"connector.source_database": "kafka_prod",
"connector.source_env": "PROD",
"connector.destination_env": "PROD"
}, },
"name": "confluent_cloud", "name": "confluent_cloud",
"description": "Fivetran data pipeline from confluent_cloud to kafka", "description": "Fivetran data pipeline from confluent_cloud to kafka",