Revert "cll improvements"

This reverts commit 281b6df87568062c8d1d8e6b327d9e1d4236cbfd.
This commit is contained in:
Jonny Dixon 2025-03-24 16:46:42 +00:00
parent 281b6df875
commit fe1a042da5
2 changed files with 66 additions and 697 deletions

View File

@ -1,6 +1,5 @@
import logging
import re
from typing import Any, Dict, Iterable, List, Optional
from typing import Dict, Iterable, List, Optional
import datahub.emitter.mce_builder as builder
from datahub.api.entities.datajob import DataFlow, DataJob
@ -26,12 +25,7 @@ from datahub.ingestion.source.fivetran.config import (
FivetranSourceReport,
PlatformDetail,
)
from datahub.ingestion.source.fivetran.data_classes import (
ColumnLineage,
Connector,
Job,
TableLineage,
)
from datahub.ingestion.source.fivetran.data_classes import Connector, Job, TableLineage
from datahub.ingestion.source.fivetran.fivetran_access import (
create_fivetran_access,
)
@ -1145,270 +1139,6 @@ class FivetranSource(StatefulIngestionSourceBase):
dpi = self._generate_dpi_from_job(job, datajob)
yield from self._get_dpi_workunits(job, dpi)
def _enhance_missing_column_lineage(self, connector: Connector) -> None:
"""
Last-resort attempt to add column lineage to a connector's tables.
This tries various methods to find column information when all else has failed.
"""
if not connector.lineage:
return
# Keep track of tables we've enhanced
enhanced_tables = 0
# Process each table that doesn't have column lineage
for idx, table_lineage in enumerate(connector.lineage):
if table_lineage.column_lineage:
continue # Skip tables that already have column lineage
# Try to enhance this specific table's column lineage
if self._enhance_single_table_column_lineage(connector, idx, table_lineage):
enhanced_tables += 1
if enhanced_tables > 0:
logger.info(
f"Enhanced column lineage for {enhanced_tables} tables in connector {connector.connector_id}"
)
else:
logger.warning(
f"Could not enhance column lineage for any tables in connector {connector.connector_id}"
)
def _enhance_single_table_column_lineage(
self, connector: Connector, idx: int, table_lineage: TableLineage
) -> bool:
"""Helper method to enhance column lineage for a single table to reduce complexity."""
source_table = table_lineage.source_table
destination_table = table_lineage.destination_table
if not source_table or not destination_table or "." not in source_table:
return False
schema_name, table_name = source_table.split(".", 1)
# Get destination platform
destination_platform = connector.additional_properties.get(
"destination_platform", "unknown"
)
try:
# Check if we're using the standard API implementation
from datahub.ingestion.source.fivetran.fivetran_standard_api import (
FivetranStandardAPI,
)
if isinstance(self.fivetran_access, FivetranStandardAPI):
# Use the standard API's get_columns_from_api method
columns = self.fivetran_access._get_columns_from_api(
source_table, connector.connector_id
)
if columns:
column_lineage = self._create_columns_lineage_from_data(
columns, destination_platform
)
if column_lineage:
connector.lineage[idx].column_lineage = column_lineage
logger.info(
f"Added {len(column_lineage)} columns to lineage for {table_lineage.source_table} -> {table_lineage.destination_table}"
)
return True
# Try a different approach - use the connector schemas method
# This should work with any implementation of FivetranAccessInterface
schemas = []
try:
# We need to get the schemas from the appropriate API
# Try to use list_connector_schemas indirectly through the fivetran_access
if hasattr(self.fivetran_access, "api_client") and hasattr(
self.fivetran_access.api_client, "list_connector_schemas"
):
schemas = self.fivetran_access.api_client.list_connector_schemas(
connector.connector_id
)
else:
# Depending on your implementation, there might be other ways to get schemas
# For now, we'll just log and continue
logger.warning(
f"No suitable method found to get schemas for connector {connector.connector_id}"
)
except Exception as schema_e:
logger.warning(
f"Error getting schemas for connector {connector.connector_id}: {schema_e}"
)
if schemas:
return self._find_and_add_column_lineage(
connector,
idx,
schemas,
schema_name,
table_name,
destination_platform,
table_lineage,
)
except Exception as e:
logger.warning(f"Error enhancing column lineage for {source_table}: {e}")
return False
def _find_and_add_column_lineage(
self,
connector: Connector,
idx: int,
schemas: List[Dict],
schema_name: str,
table_name: str,
destination_platform: str,
table_lineage: TableLineage,
) -> bool:
"""Find table in schemas and add column lineage if found."""
for schema in schemas:
if schema.get("name") != schema_name:
continue
for table in schema.get("tables", []):
if not isinstance(table, dict) or table.get("name") != table_name:
continue
# Found the table, try to get columns
columns = self._get_columns_for_table(
connector, schema_name, table_name, table
)
if not columns:
continue
# Create column lineage
column_lineage = self._create_columns_lineage_from_data(
columns, destination_platform
)
if column_lineage:
connector.lineage[idx].column_lineage = column_lineage
logger.info(
f"Added {len(column_lineage)} columns to lineage for {table_lineage.source_table} -> {table_lineage.destination_table}"
)
return True
return False
def _get_columns_for_table(
self, connector: Connector, schema_name: str, table_name: str, table: Dict
) -> List[Any]:
"""Get columns for a table, trying multiple methods."""
# First try columns in table data
columns = table.get("columns", [])
if columns:
return columns
# Try direct API call as fallback - need to handle the interface correctly
try:
# Check if we're using the standard API implementation
from datahub.ingestion.source.fivetran.fivetran_standard_api import (
FivetranStandardAPI,
)
if isinstance(self.fivetran_access, FivetranStandardAPI):
# Use the standard API's method to get columns
return self.fivetran_access._get_columns_from_api(
f"{schema_name}.{table_name}", connector.connector_id
)
# If not using standard API, try alternative approach
if hasattr(self.fivetran_access, "api_client") and hasattr(
self.fivetran_access.api_client, "get_table_columns"
):
columns = self.fivetran_access.api_client.get_table_columns(
connector.connector_id, schema_name, table_name
)
if columns:
return columns
except Exception as e:
logger.debug(f"Failed to get columns via API: {e}")
return []
def _create_columns_lineage_from_data(
self, columns: List[Any], destination_platform: str
) -> List[ColumnLineage]:
"""Create column lineage objects from raw column data."""
column_lineage = []
is_bigquery = destination_platform.lower() == "bigquery"
# Process columns based on format
if isinstance(columns, list):
for column in columns:
col_name = self._extract_column_name(column)
if not col_name or col_name.startswith("_fivetran"):
continue
# Get destination column name
dest_col_name = self._get_destination_column_name(
column, col_name, is_bigquery
)
column_lineage.append(
ColumnLineage(
source_column=col_name, destination_column=dest_col_name
)
)
# Handle dictionary format
elif isinstance(columns, dict):
for col_name, col_data in columns.items():
if col_name.startswith("_fivetran"):
continue
# Get destination column name
dest_col_name = self._get_destination_column_name_from_dict(
col_data, col_name, is_bigquery
)
column_lineage.append(
ColumnLineage(
source_column=col_name, destination_column=dest_col_name
)
)
return column_lineage
def _extract_column_name(self, column: Any) -> Optional[str]:
"""Extract column name from column data."""
if isinstance(column, dict):
return column.get("name")
elif isinstance(column, str):
return column
return None
def _get_destination_column_name(
self, column: Any, col_name: str, is_bigquery: bool
) -> str:
"""Get destination column name, preferring name_in_destination if available."""
if isinstance(column, dict) and "name_in_destination" in column:
return column.get("name_in_destination")
else:
return self._transform_column_name_for_platform(col_name, is_bigquery)
def _get_destination_column_name_from_dict(
self, col_data: Any, col_name: str, is_bigquery: bool
) -> str:
"""Get destination column name from dictionary format column data."""
if isinstance(col_data, dict) and "name_in_destination" in col_data:
return col_data.get("name_in_destination")
else:
return self._transform_column_name_for_platform(col_name, is_bigquery)
def _transform_column_name_for_platform(
self, col_name: str, is_bigquery: bool
) -> str:
"""Transform column name based on destination platform."""
if is_bigquery:
# For BigQuery, convert to snake_case
s1 = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", col_name)
s2 = re.sub("([a-z0-9])([A-Z])", r"\1_\2", s1)
return s2.lower()
else:
# For other platforms like Snowflake, typically uppercase
return col_name.upper()
def _get_connector_workunits(
self, connector: Connector
) -> Iterable[MetadataWorkUnit]:
@ -1423,25 +1153,8 @@ class FivetranSource(StatefulIngestionSourceBase):
# Store field lineage workunits to emit after dataset workunits
field_lineage_workunits = []
# We'll only consider a connector to have valid lineage if it has table lineage
# AND at least one of those tables has column lineage
has_column_lineage = any(
table_lineage.column_lineage for table_lineage in connector.lineage
)
# Special handling for connectors with lineage but no job history
if not connector.jobs and connector.lineage:
# Check if there's any column lineage to include
if not has_column_lineage:
logger.warning(
f"Connector {connector.connector_name} (ID: {connector.connector_id}) "
f"has {len(connector.lineage)} lineage entries but no column lineage. "
"Column-level lineage information will be missing."
)
# Try one last attempt to add column lineage
self._enhance_missing_column_lineage(connector)
logger.info(
f"Connector {connector.connector_name} (ID: {connector.connector_id}) "
f"has {len(connector.lineage)} lineage entries but no job history. "

View File

@ -4,7 +4,7 @@
import difflib
import logging
import re
from typing import Any, Dict, List, Optional, Set, Tuple
from typing import Dict, List, Optional, Set, Tuple
from datahub.configuration.common import AllowDenyPattern
from datahub.ingestion.source.fivetran.config import (
@ -235,422 +235,84 @@ class FivetranStandardAPI(FivetranAccessInterface):
def _fill_connectors_lineage(self, connectors: List[Connector]) -> None:
"""
Fill in lineage information for all connectors with special attention to column lineage.
Fill in lineage information for connectors by calling the API with enhanced diagnostics.
Ensures every connector with schema information gets lineage.
"""
# Cache connectors for later use
self._connector_cache = connectors
# First process explicitly to make sure lineage is extracted
for connector in connectors:
self._fill_connector_lineage(connector)
# Perform second pass for column lineage enhancement
self._enhance_column_lineage_for_connectors(connectors)
def _enhance_column_lineage_for_connectors(
self, connectors: List[Connector]
) -> None:
"""Enhance column lineage for connectors that have tables without column information."""
logger.info("Performing secondary column lineage enhancement pass")
for connector in connectors:
try:
if not connector.lineage:
continue
# Find tables that need column lineage enhancement
tables_missing_columns = self._find_tables_missing_columns(connector)
if not tables_missing_columns:
continue
logger.info(
f"Enhancing column lineage for {len(tables_missing_columns)} tables in connector {connector.connector_id}"
)
# Get schemas if we don't have them already
schemas = self.api_client.list_connector_schemas(connector.connector_id)
if not schemas:
continue
# Make sure we have column info
self._preload_all_columns(connector.connector_id, schemas)
# Process each table missing columns
self._process_tables_missing_columns(
connector, tables_missing_columns, schemas
)
except Exception as e:
logger.error(
f"Error enhancing column lineage for connector {connector.connector_id}: {e}",
exc_info=True,
)
def _find_tables_missing_columns(
self, connector: Connector
) -> List[Tuple[int, TableLineage]]:
"""Find tables in a connector that don't have column lineage."""
tables_missing_columns = []
for idx, table_lineage in enumerate(connector.lineage):
if not table_lineage.column_lineage:
# Track tables that need column lineage enhancement
tables_missing_columns.append((idx, table_lineage))
return tables_missing_columns
def _process_tables_missing_columns(
self,
connector: Connector,
tables_missing_columns: List[Tuple[int, TableLineage]],
schemas: List[Dict],
) -> None:
"""Process tables that are missing column information."""
for idx, table_lineage in tables_missing_columns:
source_table = table_lineage.source_table
if "." not in source_table:
continue
schema_name, table_name = source_table.split(".", 1)
# Find this table in the schemas
self._find_and_enhance_table_columns(
connector, idx, table_lineage, schema_name, table_name, schemas
)
def _find_and_enhance_table_columns(
self,
connector: Connector,
idx: int,
table_lineage: TableLineage,
schema_name: str,
table_name: str,
schemas: List[Dict],
) -> None:
"""Find a table in schemas and enhance its column information."""
for schema in schemas:
if schema.get("name") != schema_name:
continue
for table in schema.get("tables", []):
if not isinstance(table, dict) or table.get("name") != table_name:
continue
# Found matching table, extract column lineage
self._extract_and_set_column_lineage(
connector, idx, table_lineage, table
)
return # Found our table, no need to continue searching
def _extract_and_set_column_lineage(
self, connector: Connector, idx: int, table_lineage: TableLineage, table: Dict
) -> None:
"""Extract column lineage from table and set it on the lineage object."""
# Found matching table, extract column lineage
columns = table.get("columns", [])
if not columns:
return
# Determine destination platform
destination_platform = self._get_destination_platform(connector)
# Extract column lineage
column_lineage = self._extract_column_lineage_from_columns(
columns, destination_platform
)
if column_lineage:
logger.info(
f"Added {len(column_lineage)} columns to lineage for {table_lineage.source_table}"
)
connector.lineage[idx].column_lineage = column_lineage
def _extract_column_lineage_from_columns(
self, columns: List[Any], destination_platform: str
) -> List[ColumnLineage]:
"""Extract column lineage from a list of columns."""
column_lineage = []
is_bigquery = destination_platform.lower() == "bigquery"
if not isinstance(columns, list):
return column_lineage
for column in columns:
col_name = None
if isinstance(column, dict):
col_name = column.get("name")
elif isinstance(column, str):
col_name = column
if col_name and not col_name.startswith("_fivetran"):
# Get destination column name - prefer name_in_destination if available
dest_col_name = None
if isinstance(column, dict) and "name_in_destination" in column:
dest_col_name = column.get("name_in_destination")
# If no name_in_destination, transform based on platform
if not dest_col_name:
dest_col_name = self._transform_column_name_for_platform(
col_name, is_bigquery
)
column_lineage.append(
ColumnLineage(
source_column=col_name, destination_column=dest_col_name
)
)
return column_lineage
def _create_synthetic_lineage(
self, connector: Connector, schemas: List[dict], destination_platform: str
) -> None:
"""Create synthetic lineage for a connector based just on schema and table information."""
lineage_list = []
# Process each schema and its tables
for schema in schemas:
schema_name = schema.get("name", "")
if not schema_name:
continue
lineage_entries = self._process_schema_for_synthetic_lineage(
schema, schema_name, destination_platform
)
lineage_list.extend(lineage_entries)
for table in schema.get("tables", []):
if not isinstance(table, dict):
continue
table_name = table.get("name", "")
if not table_name or not table.get("enabled", True):
continue
# Create source table identifier
source_table = f"{schema_name}.{table_name}"
# Get destination names
dest_schema = self._get_destination_schema_name(
schema_name, destination_platform
)
dest_table = self._get_destination_table_name(
table_name, destination_platform
)
destination_table = f"{dest_schema}.{dest_table}"
# Create synthetic column lineage if we have column info
column_lineage = []
columns = table.get("columns", [])
if isinstance(columns, list):
for column in columns:
col_name = None
if isinstance(column, dict):
col_name = column.get("name")
elif isinstance(column, str):
col_name = column
if col_name and not col_name.startswith("_fivetran"):
is_bigquery = destination_platform.lower() == "bigquery"
dest_col = self._transform_column_name_for_platform(
col_name, is_bigquery
)
column_lineage.append(
ColumnLineage(
source_column=col_name, destination_column=dest_col
)
)
# Add this table's lineage
lineage_list.append(
TableLineage(
source_table=source_table,
destination_table=destination_table,
column_lineage=column_lineage,
)
)
if lineage_list:
logger.info(
f"Created {len(lineage_list)} synthetic table lineage entries for connector {connector.connector_id}"
)
# Store the lineage in the connector
# Set the lineage directly on the connector instead of using _lineage_cache
connector.lineage = lineage_list
def _process_schema_for_synthetic_lineage(
self, schema: Dict, schema_name: str, destination_platform: str
) -> List[TableLineage]:
"""Process a schema to create synthetic lineage entries."""
lineage_entries = []
tables = schema.get("tables", [])
if not isinstance(tables, list):
return lineage_entries
for table in tables:
if not isinstance(table, dict):
continue
lineage_entry = self._process_table_for_synthetic_lineage(
table, schema, schema_name, destination_platform
)
if lineage_entry:
lineage_entries.append(lineage_entry)
return lineage_entries
def _process_table_for_synthetic_lineage(
self, table: Dict, schema: Dict, schema_name: str, destination_platform: str
) -> Optional[TableLineage]:
"""Process a table to create a synthetic lineage entry."""
table_name = table.get("name", "")
if not table_name or not table.get("enabled", True):
return None
# Create source and destination table identifiers
source_table, destination_table = self._create_source_dest_table_names(
table, schema, schema_name, table_name, destination_platform
)
# Create column lineage
column_lineage = self._create_synthetic_column_lineage(
table, source_table, destination_platform
)
# Add this table's lineage if we have table information
if source_table and destination_table:
logger.info(
f"Creating lineage: {source_table} -> {destination_table} with {len(column_lineage)} columns"
)
return TableLineage(
source_table=source_table,
destination_table=destination_table,
column_lineage=column_lineage,
)
return None
def _create_source_dest_table_names(
self,
table: Dict,
schema: Dict,
schema_name: str,
table_name: str,
destination_platform: str,
) -> Tuple[str, str]:
"""Create source and destination table names with proper casing and naming conventions."""
# Create source table identifier
source_table = f"{schema_name}.{table_name}"
# Get destination names - using name_in_destination if available
schema_name_in_destination = schema.get("name_in_destination")
dest_schema = (
schema_name_in_destination
if schema_name_in_destination
else self._get_destination_schema_name(schema_name, destination_platform)
)
table_name_in_destination = table.get("name_in_destination")
dest_table = (
table_name_in_destination
if table_name_in_destination
else self._get_destination_table_name(table_name, destination_platform)
)
destination_table = f"{dest_schema}.{dest_table}"
return source_table, destination_table
def _create_synthetic_column_lineage(
self, table: Dict, source_table: str, destination_platform: str
) -> List[ColumnLineage]:
"""Create synthetic column lineage for a table."""
column_lineage = []
# First try with list format columns
columns = table.get("columns", [])
if isinstance(columns, list):
column_lineage = self._process_list_columns_for_lineage(
columns, destination_platform
)
# Try to extract columns from dict format if no columns found yet
if not column_lineage and isinstance(table.get("columns"), dict):
column_dict = table.get("columns", {})
column_lineage = self._process_dict_columns_for_lineage(
column_dict, destination_platform
)
# If we still don't have columns but we know the source table,
# try to get columns from API or similar tables
if not column_lineage:
logger.info(
f"No columns found for {source_table}, attempting additional discovery"
)
column_lineage = self._get_columns_from_additional_sources(
source_table, destination_platform
)
return column_lineage
def _process_list_columns_for_lineage(
self, columns: List[Any], destination_platform: str
) -> List[ColumnLineage]:
"""Process list format columns for lineage."""
column_lineage = []
for column in columns:
if not column:
continue
col_name = None
if isinstance(column, dict):
col_name = column.get("name")
elif isinstance(column, str):
col_name = column
if not col_name or col_name.startswith("_fivetran"):
continue
# Get the destination column name
dest_col_name = self._get_destination_column_name(
column, col_name, destination_platform
)
# Add to lineage
column_lineage.append(
ColumnLineage(source_column=col_name, destination_column=dest_col_name)
)
return column_lineage
def _get_destination_column_name(
self, column: Any, col_name: str, destination_platform: str
) -> str:
"""Get destination column name with proper handling of name_in_destination."""
# First check for name_in_destination
dest_col_name = None
if isinstance(column, dict) and "name_in_destination" in column:
dest_col_name = column.get("name_in_destination")
logger.debug(
f"Using name_in_destination {dest_col_name} for column {col_name}"
)
# If no explicit mapping, transform based on destination platform
if not dest_col_name:
is_bigquery = destination_platform.lower() == "bigquery"
dest_col_name = self._transform_column_name_for_platform(
col_name, is_bigquery
)
logger.debug(f"Transformed column name: {col_name} -> {dest_col_name}")
return dest_col_name
def _process_dict_columns_for_lineage(
self, column_dict: Dict, destination_platform: str
) -> List[ColumnLineage]:
"""Process dictionary format columns for lineage."""
column_lineage = []
logger.info("Extracting columns from dictionary format")
for col_name, col_info in column_dict.items():
if col_name.startswith("_fivetran"):
continue
# Get destination column name
dest_col_name = None
if isinstance(col_info, dict) and "name_in_destination" in col_info:
dest_col_name = col_info.get("name_in_destination")
if not dest_col_name:
is_bigquery = destination_platform.lower() == "bigquery"
dest_col_name = self._transform_column_name_for_platform(
col_name, is_bigquery
)
column_lineage.append(
ColumnLineage(source_column=col_name, destination_column=dest_col_name)
)
return column_lineage
def _get_columns_from_additional_sources(
self, source_table: str, destination_platform: str
) -> List[ColumnLineage]:
"""Try to get columns from additional sources when normal methods fail."""
column_lineage = []
# Try to infer from API
columns_from_api = self._get_columns_from_api(source_table)
if columns_from_api:
logger.info(
f"Found {len(columns_from_api)} columns from API for {source_table}"
)
is_bigquery = destination_platform.lower() == "bigquery"
for col in columns_from_api:
col_name = col.get("name") if isinstance(col, dict) else col
if col_name and not col_name.startswith("_fivetran"):
dest_col_name = self._transform_column_name_for_platform(
col_name, is_bigquery
)
column_lineage.append(
ColumnLineage(
source_column=col_name, destination_column=dest_col_name
)
)
return column_lineage
def _process_connector(
self,
api_connector: Dict,
@ -1356,13 +1018,8 @@ class FivetranStandardAPI(FivetranAccessInterface):
return column_lineage
def _get_columns_from_api(
self, source_table: str, connector_id: str = None
) -> List[Dict]:
"""
Get columns directly from Fivetran API for a table.
Enhanced to use provided connector_id when available.
"""
def _get_columns_from_api(self, source_table: str) -> List[Dict]:
"""Get columns directly from Fivetran API for a table."""
# Parse schema and table name
if "." not in source_table:
logger.warning(
@ -1372,24 +1029,23 @@ class FivetranStandardAPI(FivetranAccessInterface):
schema_name, table_name = source_table.split(".", 1)
# Use provided connector_id or try to find it
conn_id = connector_id
if not conn_id:
conn_id = self._find_connector_id_for_source_table(source_table)
if not conn_id:
# Find the connector ID for this source table
connector_id = self._find_connector_id_for_source_table(source_table)
if not connector_id:
logger.warning(
f"Could not find connector ID for source table {source_table}"
)
return []
logger.info(f"Using connector ID {conn_id} for source table {source_table}")
logger.info(
f"Found connector ID {connector_id} for source table {source_table}"
)
logger.info(f"Querying API for columns of {schema_name}.{table_name}")
# Call the API to get columns using the direct columns endpoint
try:
columns = self.api_client.get_table_columns(
conn_id, schema_name, table_name
connector_id, schema_name, table_name
)
if columns:
logger.info(