diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py index e1a1bec34e..7105dda25c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py @@ -1,6 +1,5 @@ import logging -import re -from typing import Any, Dict, Iterable, List, Optional +from typing import Dict, Iterable, List, Optional import datahub.emitter.mce_builder as builder from datahub.api.entities.datajob import DataFlow, DataJob @@ -26,12 +25,7 @@ from datahub.ingestion.source.fivetran.config import ( FivetranSourceReport, PlatformDetail, ) -from datahub.ingestion.source.fivetran.data_classes import ( - ColumnLineage, - Connector, - Job, - TableLineage, -) +from datahub.ingestion.source.fivetran.data_classes import Connector, Job, TableLineage from datahub.ingestion.source.fivetran.fivetran_access import ( create_fivetran_access, ) @@ -1145,270 +1139,6 @@ class FivetranSource(StatefulIngestionSourceBase): dpi = self._generate_dpi_from_job(job, datajob) yield from self._get_dpi_workunits(job, dpi) - def _enhance_missing_column_lineage(self, connector: Connector) -> None: - """ - Last-resort attempt to add column lineage to a connector's tables. - This tries various methods to find column information when all else has failed. - """ - if not connector.lineage: - return - - # Keep track of tables we've enhanced - enhanced_tables = 0 - - # Process each table that doesn't have column lineage - for idx, table_lineage in enumerate(connector.lineage): - if table_lineage.column_lineage: - continue # Skip tables that already have column lineage - - # Try to enhance this specific table's column lineage - if self._enhance_single_table_column_lineage(connector, idx, table_lineage): - enhanced_tables += 1 - - if enhanced_tables > 0: - logger.info( - f"Enhanced column lineage for {enhanced_tables} tables in connector {connector.connector_id}" - ) - else: - logger.warning( - f"Could not enhance column lineage for any tables in connector {connector.connector_id}" - ) - - def _enhance_single_table_column_lineage( - self, connector: Connector, idx: int, table_lineage: TableLineage - ) -> bool: - """Helper method to enhance column lineage for a single table to reduce complexity.""" - source_table = table_lineage.source_table - destination_table = table_lineage.destination_table - - if not source_table or not destination_table or "." not in source_table: - return False - - schema_name, table_name = source_table.split(".", 1) - - # Get destination platform - destination_platform = connector.additional_properties.get( - "destination_platform", "unknown" - ) - - try: - # Check if we're using the standard API implementation - from datahub.ingestion.source.fivetran.fivetran_standard_api import ( - FivetranStandardAPI, - ) - - if isinstance(self.fivetran_access, FivetranStandardAPI): - # Use the standard API's get_columns_from_api method - columns = self.fivetran_access._get_columns_from_api( - source_table, connector.connector_id - ) - if columns: - column_lineage = self._create_columns_lineage_from_data( - columns, destination_platform - ) - if column_lineage: - connector.lineage[idx].column_lineage = column_lineage - logger.info( - f"Added {len(column_lineage)} columns to lineage for {table_lineage.source_table} -> {table_lineage.destination_table}" - ) - return True - - # Try a different approach - use the connector schemas method - # This should work with any implementation of FivetranAccessInterface - schemas = [] - try: - # We need to get the schemas from the appropriate API - # Try to use list_connector_schemas indirectly through the fivetran_access - if hasattr(self.fivetran_access, "api_client") and hasattr( - self.fivetran_access.api_client, "list_connector_schemas" - ): - schemas = self.fivetran_access.api_client.list_connector_schemas( - connector.connector_id - ) - else: - # Depending on your implementation, there might be other ways to get schemas - # For now, we'll just log and continue - logger.warning( - f"No suitable method found to get schemas for connector {connector.connector_id}" - ) - except Exception as schema_e: - logger.warning( - f"Error getting schemas for connector {connector.connector_id}: {schema_e}" - ) - - if schemas: - return self._find_and_add_column_lineage( - connector, - idx, - schemas, - schema_name, - table_name, - destination_platform, - table_lineage, - ) - except Exception as e: - logger.warning(f"Error enhancing column lineage for {source_table}: {e}") - - return False - - def _find_and_add_column_lineage( - self, - connector: Connector, - idx: int, - schemas: List[Dict], - schema_name: str, - table_name: str, - destination_platform: str, - table_lineage: TableLineage, - ) -> bool: - """Find table in schemas and add column lineage if found.""" - for schema in schemas: - if schema.get("name") != schema_name: - continue - - for table in schema.get("tables", []): - if not isinstance(table, dict) or table.get("name") != table_name: - continue - - # Found the table, try to get columns - columns = self._get_columns_for_table( - connector, schema_name, table_name, table - ) - if not columns: - continue - - # Create column lineage - column_lineage = self._create_columns_lineage_from_data( - columns, destination_platform - ) - - if column_lineage: - connector.lineage[idx].column_lineage = column_lineage - logger.info( - f"Added {len(column_lineage)} columns to lineage for {table_lineage.source_table} -> {table_lineage.destination_table}" - ) - return True - - return False - - def _get_columns_for_table( - self, connector: Connector, schema_name: str, table_name: str, table: Dict - ) -> List[Any]: - """Get columns for a table, trying multiple methods.""" - # First try columns in table data - columns = table.get("columns", []) - if columns: - return columns - - # Try direct API call as fallback - need to handle the interface correctly - try: - # Check if we're using the standard API implementation - from datahub.ingestion.source.fivetran.fivetran_standard_api import ( - FivetranStandardAPI, - ) - - if isinstance(self.fivetran_access, FivetranStandardAPI): - # Use the standard API's method to get columns - return self.fivetran_access._get_columns_from_api( - f"{schema_name}.{table_name}", connector.connector_id - ) - - # If not using standard API, try alternative approach - if hasattr(self.fivetran_access, "api_client") and hasattr( - self.fivetran_access.api_client, "get_table_columns" - ): - columns = self.fivetran_access.api_client.get_table_columns( - connector.connector_id, schema_name, table_name - ) - if columns: - return columns - except Exception as e: - logger.debug(f"Failed to get columns via API: {e}") - - return [] - - def _create_columns_lineage_from_data( - self, columns: List[Any], destination_platform: str - ) -> List[ColumnLineage]: - """Create column lineage objects from raw column data.""" - column_lineage = [] - is_bigquery = destination_platform.lower() == "bigquery" - - # Process columns based on format - if isinstance(columns, list): - for column in columns: - col_name = self._extract_column_name(column) - if not col_name or col_name.startswith("_fivetran"): - continue - - # Get destination column name - dest_col_name = self._get_destination_column_name( - column, col_name, is_bigquery - ) - - column_lineage.append( - ColumnLineage( - source_column=col_name, destination_column=dest_col_name - ) - ) - # Handle dictionary format - elif isinstance(columns, dict): - for col_name, col_data in columns.items(): - if col_name.startswith("_fivetran"): - continue - - # Get destination column name - dest_col_name = self._get_destination_column_name_from_dict( - col_data, col_name, is_bigquery - ) - - column_lineage.append( - ColumnLineage( - source_column=col_name, destination_column=dest_col_name - ) - ) - - return column_lineage - - def _extract_column_name(self, column: Any) -> Optional[str]: - """Extract column name from column data.""" - if isinstance(column, dict): - return column.get("name") - elif isinstance(column, str): - return column - return None - - def _get_destination_column_name( - self, column: Any, col_name: str, is_bigquery: bool - ) -> str: - """Get destination column name, preferring name_in_destination if available.""" - if isinstance(column, dict) and "name_in_destination" in column: - return column.get("name_in_destination") - else: - return self._transform_column_name_for_platform(col_name, is_bigquery) - - def _get_destination_column_name_from_dict( - self, col_data: Any, col_name: str, is_bigquery: bool - ) -> str: - """Get destination column name from dictionary format column data.""" - if isinstance(col_data, dict) and "name_in_destination" in col_data: - return col_data.get("name_in_destination") - else: - return self._transform_column_name_for_platform(col_name, is_bigquery) - - def _transform_column_name_for_platform( - self, col_name: str, is_bigquery: bool - ) -> str: - """Transform column name based on destination platform.""" - if is_bigquery: - # For BigQuery, convert to snake_case - s1 = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", col_name) - s2 = re.sub("([a-z0-9])([A-Z])", r"\1_\2", s1) - return s2.lower() - else: - # For other platforms like Snowflake, typically uppercase - return col_name.upper() - def _get_connector_workunits( self, connector: Connector ) -> Iterable[MetadataWorkUnit]: @@ -1423,25 +1153,8 @@ class FivetranSource(StatefulIngestionSourceBase): # Store field lineage workunits to emit after dataset workunits field_lineage_workunits = [] - # We'll only consider a connector to have valid lineage if it has table lineage - # AND at least one of those tables has column lineage - has_column_lineage = any( - table_lineage.column_lineage for table_lineage in connector.lineage - ) - # Special handling for connectors with lineage but no job history if not connector.jobs and connector.lineage: - # Check if there's any column lineage to include - if not has_column_lineage: - logger.warning( - f"Connector {connector.connector_name} (ID: {connector.connector_id}) " - f"has {len(connector.lineage)} lineage entries but no column lineage. " - "Column-level lineage information will be missing." - ) - - # Try one last attempt to add column lineage - self._enhance_missing_column_lineage(connector) - logger.info( f"Connector {connector.connector_name} (ID: {connector.connector_id}) " f"has {len(connector.lineage)} lineage entries but no job history. " diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_standard_api.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_standard_api.py index e72283e8d5..9512539d73 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_standard_api.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_standard_api.py @@ -4,7 +4,7 @@ import difflib import logging import re -from typing import Any, Dict, List, Optional, Set, Tuple +from typing import Dict, List, Optional, Set, Tuple from datahub.configuration.common import AllowDenyPattern from datahub.ingestion.source.fivetran.config import ( @@ -235,422 +235,84 @@ class FivetranStandardAPI(FivetranAccessInterface): def _fill_connectors_lineage(self, connectors: List[Connector]) -> None: """ - Fill in lineage information for all connectors with special attention to column lineage. + Fill in lineage information for connectors by calling the API with enhanced diagnostics. + Ensures every connector with schema information gets lineage. """ - # Cache connectors for later use self._connector_cache = connectors - # First process explicitly to make sure lineage is extracted for connector in connectors: self._fill_connector_lineage(connector) - # Perform second pass for column lineage enhancement - self._enhance_column_lineage_for_connectors(connectors) - - def _enhance_column_lineage_for_connectors( - self, connectors: List[Connector] - ) -> None: - """Enhance column lineage for connectors that have tables without column information.""" - logger.info("Performing secondary column lineage enhancement pass") - - for connector in connectors: - try: - if not connector.lineage: - continue - - # Find tables that need column lineage enhancement - tables_missing_columns = self._find_tables_missing_columns(connector) - if not tables_missing_columns: - continue - - logger.info( - f"Enhancing column lineage for {len(tables_missing_columns)} tables in connector {connector.connector_id}" - ) - - # Get schemas if we don't have them already - schemas = self.api_client.list_connector_schemas(connector.connector_id) - if not schemas: - continue - - # Make sure we have column info - self._preload_all_columns(connector.connector_id, schemas) - - # Process each table missing columns - self._process_tables_missing_columns( - connector, tables_missing_columns, schemas - ) - - except Exception as e: - logger.error( - f"Error enhancing column lineage for connector {connector.connector_id}: {e}", - exc_info=True, - ) - - def _find_tables_missing_columns( - self, connector: Connector - ) -> List[Tuple[int, TableLineage]]: - """Find tables in a connector that don't have column lineage.""" - tables_missing_columns = [] - for idx, table_lineage in enumerate(connector.lineage): - if not table_lineage.column_lineage: - # Track tables that need column lineage enhancement - tables_missing_columns.append((idx, table_lineage)) - return tables_missing_columns - - def _process_tables_missing_columns( - self, - connector: Connector, - tables_missing_columns: List[Tuple[int, TableLineage]], - schemas: List[Dict], - ) -> None: - """Process tables that are missing column information.""" - for idx, table_lineage in tables_missing_columns: - source_table = table_lineage.source_table - if "." not in source_table: - continue - - schema_name, table_name = source_table.split(".", 1) - - # Find this table in the schemas - self._find_and_enhance_table_columns( - connector, idx, table_lineage, schema_name, table_name, schemas - ) - - def _find_and_enhance_table_columns( - self, - connector: Connector, - idx: int, - table_lineage: TableLineage, - schema_name: str, - table_name: str, - schemas: List[Dict], - ) -> None: - """Find a table in schemas and enhance its column information.""" - for schema in schemas: - if schema.get("name") != schema_name: - continue - - for table in schema.get("tables", []): - if not isinstance(table, dict) or table.get("name") != table_name: - continue - - # Found matching table, extract column lineage - self._extract_and_set_column_lineage( - connector, idx, table_lineage, table - ) - return # Found our table, no need to continue searching - - def _extract_and_set_column_lineage( - self, connector: Connector, idx: int, table_lineage: TableLineage, table: Dict - ) -> None: - """Extract column lineage from table and set it on the lineage object.""" - # Found matching table, extract column lineage - columns = table.get("columns", []) - if not columns: - return - - # Determine destination platform - destination_platform = self._get_destination_platform(connector) - - # Extract column lineage - column_lineage = self._extract_column_lineage_from_columns( - columns, destination_platform - ) - - if column_lineage: - logger.info( - f"Added {len(column_lineage)} columns to lineage for {table_lineage.source_table}" - ) - connector.lineage[idx].column_lineage = column_lineage - - def _extract_column_lineage_from_columns( - self, columns: List[Any], destination_platform: str - ) -> List[ColumnLineage]: - """Extract column lineage from a list of columns.""" - column_lineage = [] - is_bigquery = destination_platform.lower() == "bigquery" - - if not isinstance(columns, list): - return column_lineage - - for column in columns: - col_name = None - if isinstance(column, dict): - col_name = column.get("name") - elif isinstance(column, str): - col_name = column - - if col_name and not col_name.startswith("_fivetran"): - # Get destination column name - prefer name_in_destination if available - dest_col_name = None - if isinstance(column, dict) and "name_in_destination" in column: - dest_col_name = column.get("name_in_destination") - - # If no name_in_destination, transform based on platform - if not dest_col_name: - dest_col_name = self._transform_column_name_for_platform( - col_name, is_bigquery - ) - - column_lineage.append( - ColumnLineage( - source_column=col_name, destination_column=dest_col_name - ) - ) - - return column_lineage - def _create_synthetic_lineage( self, connector: Connector, schemas: List[dict], destination_platform: str ) -> None: """Create synthetic lineage for a connector based just on schema and table information.""" lineage_list = [] - # Process each schema and its tables for schema in schemas: schema_name = schema.get("name", "") if not schema_name: continue - lineage_entries = self._process_schema_for_synthetic_lineage( - schema, schema_name, destination_platform - ) - lineage_list.extend(lineage_entries) + for table in schema.get("tables", []): + if not isinstance(table, dict): + continue + + table_name = table.get("name", "") + if not table_name or not table.get("enabled", True): + continue + + # Create source table identifier + source_table = f"{schema_name}.{table_name}" + + # Get destination names + dest_schema = self._get_destination_schema_name( + schema_name, destination_platform + ) + dest_table = self._get_destination_table_name( + table_name, destination_platform + ) + destination_table = f"{dest_schema}.{dest_table}" + + # Create synthetic column lineage if we have column info + column_lineage = [] + columns = table.get("columns", []) + + if isinstance(columns, list): + for column in columns: + col_name = None + if isinstance(column, dict): + col_name = column.get("name") + elif isinstance(column, str): + col_name = column + + if col_name and not col_name.startswith("_fivetran"): + is_bigquery = destination_platform.lower() == "bigquery" + dest_col = self._transform_column_name_for_platform( + col_name, is_bigquery + ) + column_lineage.append( + ColumnLineage( + source_column=col_name, destination_column=dest_col + ) + ) + + # Add this table's lineage + lineage_list.append( + TableLineage( + source_table=source_table, + destination_table=destination_table, + column_lineage=column_lineage, + ) + ) if lineage_list: logger.info( f"Created {len(lineage_list)} synthetic table lineage entries for connector {connector.connector_id}" ) - # Store the lineage in the connector + # Set the lineage directly on the connector instead of using _lineage_cache connector.lineage = lineage_list - def _process_schema_for_synthetic_lineage( - self, schema: Dict, schema_name: str, destination_platform: str - ) -> List[TableLineage]: - """Process a schema to create synthetic lineage entries.""" - lineage_entries = [] - - tables = schema.get("tables", []) - if not isinstance(tables, list): - return lineage_entries - - for table in tables: - if not isinstance(table, dict): - continue - - lineage_entry = self._process_table_for_synthetic_lineage( - table, schema, schema_name, destination_platform - ) - - if lineage_entry: - lineage_entries.append(lineage_entry) - - return lineage_entries - - def _process_table_for_synthetic_lineage( - self, table: Dict, schema: Dict, schema_name: str, destination_platform: str - ) -> Optional[TableLineage]: - """Process a table to create a synthetic lineage entry.""" - table_name = table.get("name", "") - if not table_name or not table.get("enabled", True): - return None - - # Create source and destination table identifiers - source_table, destination_table = self._create_source_dest_table_names( - table, schema, schema_name, table_name, destination_platform - ) - - # Create column lineage - column_lineage = self._create_synthetic_column_lineage( - table, source_table, destination_platform - ) - - # Add this table's lineage if we have table information - if source_table and destination_table: - logger.info( - f"Creating lineage: {source_table} -> {destination_table} with {len(column_lineage)} columns" - ) - return TableLineage( - source_table=source_table, - destination_table=destination_table, - column_lineage=column_lineage, - ) - - return None - - def _create_source_dest_table_names( - self, - table: Dict, - schema: Dict, - schema_name: str, - table_name: str, - destination_platform: str, - ) -> Tuple[str, str]: - """Create source and destination table names with proper casing and naming conventions.""" - # Create source table identifier - source_table = f"{schema_name}.{table_name}" - - # Get destination names - using name_in_destination if available - schema_name_in_destination = schema.get("name_in_destination") - dest_schema = ( - schema_name_in_destination - if schema_name_in_destination - else self._get_destination_schema_name(schema_name, destination_platform) - ) - - table_name_in_destination = table.get("name_in_destination") - dest_table = ( - table_name_in_destination - if table_name_in_destination - else self._get_destination_table_name(table_name, destination_platform) - ) - - destination_table = f"{dest_schema}.{dest_table}" - - return source_table, destination_table - - def _create_synthetic_column_lineage( - self, table: Dict, source_table: str, destination_platform: str - ) -> List[ColumnLineage]: - """Create synthetic column lineage for a table.""" - column_lineage = [] - - # First try with list format columns - columns = table.get("columns", []) - if isinstance(columns, list): - column_lineage = self._process_list_columns_for_lineage( - columns, destination_platform - ) - - # Try to extract columns from dict format if no columns found yet - if not column_lineage and isinstance(table.get("columns"), dict): - column_dict = table.get("columns", {}) - column_lineage = self._process_dict_columns_for_lineage( - column_dict, destination_platform - ) - - # If we still don't have columns but we know the source table, - # try to get columns from API or similar tables - if not column_lineage: - logger.info( - f"No columns found for {source_table}, attempting additional discovery" - ) - column_lineage = self._get_columns_from_additional_sources( - source_table, destination_platform - ) - - return column_lineage - - def _process_list_columns_for_lineage( - self, columns: List[Any], destination_platform: str - ) -> List[ColumnLineage]: - """Process list format columns for lineage.""" - column_lineage = [] - - for column in columns: - if not column: - continue - - col_name = None - if isinstance(column, dict): - col_name = column.get("name") - elif isinstance(column, str): - col_name = column - - if not col_name or col_name.startswith("_fivetran"): - continue - - # Get the destination column name - dest_col_name = self._get_destination_column_name( - column, col_name, destination_platform - ) - - # Add to lineage - column_lineage.append( - ColumnLineage(source_column=col_name, destination_column=dest_col_name) - ) - - return column_lineage - - def _get_destination_column_name( - self, column: Any, col_name: str, destination_platform: str - ) -> str: - """Get destination column name with proper handling of name_in_destination.""" - # First check for name_in_destination - dest_col_name = None - if isinstance(column, dict) and "name_in_destination" in column: - dest_col_name = column.get("name_in_destination") - logger.debug( - f"Using name_in_destination {dest_col_name} for column {col_name}" - ) - - # If no explicit mapping, transform based on destination platform - if not dest_col_name: - is_bigquery = destination_platform.lower() == "bigquery" - dest_col_name = self._transform_column_name_for_platform( - col_name, is_bigquery - ) - logger.debug(f"Transformed column name: {col_name} -> {dest_col_name}") - - return dest_col_name - - def _process_dict_columns_for_lineage( - self, column_dict: Dict, destination_platform: str - ) -> List[ColumnLineage]: - """Process dictionary format columns for lineage.""" - column_lineage = [] - - logger.info("Extracting columns from dictionary format") - - for col_name, col_info in column_dict.items(): - if col_name.startswith("_fivetran"): - continue - - # Get destination column name - dest_col_name = None - if isinstance(col_info, dict) and "name_in_destination" in col_info: - dest_col_name = col_info.get("name_in_destination") - - if not dest_col_name: - is_bigquery = destination_platform.lower() == "bigquery" - dest_col_name = self._transform_column_name_for_platform( - col_name, is_bigquery - ) - - column_lineage.append( - ColumnLineage(source_column=col_name, destination_column=dest_col_name) - ) - - return column_lineage - - def _get_columns_from_additional_sources( - self, source_table: str, destination_platform: str - ) -> List[ColumnLineage]: - """Try to get columns from additional sources when normal methods fail.""" - column_lineage = [] - - # Try to infer from API - columns_from_api = self._get_columns_from_api(source_table) - if columns_from_api: - logger.info( - f"Found {len(columns_from_api)} columns from API for {source_table}" - ) - - is_bigquery = destination_platform.lower() == "bigquery" - for col in columns_from_api: - col_name = col.get("name") if isinstance(col, dict) else col - if col_name and not col_name.startswith("_fivetran"): - dest_col_name = self._transform_column_name_for_platform( - col_name, is_bigquery - ) - column_lineage.append( - ColumnLineage( - source_column=col_name, destination_column=dest_col_name - ) - ) - - return column_lineage - def _process_connector( self, api_connector: Dict, @@ -1356,13 +1018,8 @@ class FivetranStandardAPI(FivetranAccessInterface): return column_lineage - def _get_columns_from_api( - self, source_table: str, connector_id: str = None - ) -> List[Dict]: - """ - Get columns directly from Fivetran API for a table. - Enhanced to use provided connector_id when available. - """ + def _get_columns_from_api(self, source_table: str) -> List[Dict]: + """Get columns directly from Fivetran API for a table.""" # Parse schema and table name if "." not in source_table: logger.warning( @@ -1372,24 +1029,23 @@ class FivetranStandardAPI(FivetranAccessInterface): schema_name, table_name = source_table.split(".", 1) - # Use provided connector_id or try to find it - conn_id = connector_id - if not conn_id: - conn_id = self._find_connector_id_for_source_table(source_table) - - if not conn_id: + # Find the connector ID for this source table + connector_id = self._find_connector_id_for_source_table(source_table) + if not connector_id: logger.warning( f"Could not find connector ID for source table {source_table}" ) return [] - logger.info(f"Using connector ID {conn_id} for source table {source_table}") + logger.info( + f"Found connector ID {connector_id} for source table {source_table}" + ) logger.info(f"Querying API for columns of {schema_name}.{table_name}") # Call the API to get columns using the direct columns endpoint try: columns = self.api_client.get_table_columns( - conn_id, schema_name, table_name + connector_id, schema_name, table_name ) if columns: logger.info(