diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_api_client.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_api_client.py index 428784dda7..69eb756085 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_api_client.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_api_client.py @@ -163,7 +163,7 @@ class FivetranAPIClient: self, connector_id: str, schema_name: str, table_name: str ) -> List[Dict]: """ - Get detailed column information for a specific table by directly querying the columns endpoint. + Get detailed column information for a specific table using the direct columns endpoint. Args: connector_id: The Fivetran connector ID @@ -189,7 +189,7 @@ class FivetranAPIClient: response = self._make_request("GET", url) # Extract columns from the response - columns_data = response.get("data", {}).get("items", []) + columns_data = response.get("data", {}).get("columns", []) logger.info( f"Retrieved {len(columns_data)} columns directly from columns endpoint for {schema_name}.{table_name}" @@ -200,39 +200,6 @@ class FivetranAPIClient: logger.warning( f"Failed to get columns from direct endpoint for {schema_name}.{table_name}: {e}" ) - - # Try fallback approaches if the direct endpoint fails - try: - # First try the tables endpoint - encoded_schema = urllib.parse.quote(schema_name) - url = f"/connectors/{connector_id}/schemas/{encoded_schema}/tables" - response = self._make_request("GET", url) - - # Find the requested table - tables_data = response.get("data", {}).get("items", []) - for table in tables_data: - if table.get("name") == table_name: - columns = table.get("columns", []) - logger.info( - f"Found {len(columns)} columns from tables endpoint for {schema_name}.{table_name}" - ) - return columns - - # If still no columns, try the full schemas endpoint - schemas = self.list_connector_schemas(connector_id) - for schema in schemas: - if schema.get("name") == schema_name: - for table in schema.get("tables", []): - if table.get("name") == table_name: - columns = table.get("columns", []) - logger.info( - f"Found {len(columns)} columns from schemas endpoint for {schema_name}.{table_name}" - ) - return columns - - except Exception as fallback_error: - logger.warning(f"Fallback attempts also failed: {fallback_error}") - return [] def list_connector_schemas(self, connector_id: str) -> List[Dict]: diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_standard_api.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_standard_api.py index c9295ad8fd..2d89af7453 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_standard_api.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_standard_api.py @@ -704,27 +704,104 @@ class FivetranStandardAPI(FivetranAccessInterface): destination_platform: str, source_table_columns: Dict[str, Dict[str, str]], ) -> List[ColumnLineage]: - """Extract column lineage with enhanced retrieval and mapping.""" + """ + Extract column-level lineage for a table with improved API integration. + + Args: + table: Table data from API + source_table: Full source table name (schema.table) + destination_platform: Destination platform type + source_table_columns: Dict mapping table names to column information + + Returns: + List of ColumnLineage objects + """ logger.info( f"Extracting column lineage for {source_table} to {destination_platform}" ) + logger.debug(f"Table data keys: {list(table.keys())}") - # Get columns from different sources - columns = self._get_columns_from_sources( - table, source_table, source_table_columns - ) + # 1. First try to get columns from the table data + columns = table.get("columns", []) + + # Handle different column formats + if isinstance(columns, dict): + # Convert dict format to list + columns_list = [] + for col_name, col_data in columns.items(): + if isinstance(col_data, dict): + col_data = col_data.copy() + col_data["name"] = col_name + columns_list.append(col_data) + else: + columns_list.append({"name": col_name}) + columns = columns_list + logger.info(f"Converted dict format to list with {len(columns)} columns") + + # 2. If no columns found, try to retrieve them directly from the API columns endpoint + if not columns: + logger.info( + f"No columns in table data, trying direct API query for {source_table}" + ) + columns = self._get_columns_from_api(source_table) + + # 3. If still no columns, try source_table_columns + if not columns and source_table in source_table_columns: + logger.info(f"Using columns from source_table_columns for {source_table}") + columns = [ + {"name": col_name, "type": col_type} + for col_name, col_type in source_table_columns[source_table].items() + ] + + # Now create column lineage from the columns we have + column_lineage = [] + is_bigquery = destination_platform.lower() == "bigquery" - # If we still don't have any columns, return empty list if not columns: logger.warning(f"No column information available for {source_table}") return [] - # Create column lineage - column_lineage = self._create_column_lineage_from_columns( - columns, source_table, destination_platform - ) + for column in columns: + col_name = None + if isinstance(column, dict): + col_name = column.get("name") + elif isinstance(column, str): + col_name = column + + if not col_name or col_name.startswith("_fivetran"): + continue + + # Get destination column name - prefer name_in_destination if available + dest_col_name = None + if isinstance(column, dict) and "name_in_destination" in column: + dest_col_name = column.get("name_in_destination") + logger.debug( + f"Using name_in_destination: {col_name} -> {dest_col_name}" + ) + + # If no name_in_destination, transform based on platform + if not dest_col_name: + if is_bigquery: + # For BigQuery, convert to snake_case + dest_col_name = self._transform_column_name_for_platform( + col_name, True + ) + else: + # For other platforms like Snowflake, typically uppercase + dest_col_name = self._transform_column_name_for_platform( + col_name, False + ) + + logger.debug(f"Transformed name: {col_name} -> {dest_col_name}") + + # Add to lineage + column_lineage.append( + ColumnLineage( + source_column=col_name, + destination_column=dest_col_name, + ) + ) - # Log results if column_lineage: logger.info( f"Created {len(column_lineage)} column lineage entries for {source_table}" @@ -822,22 +899,46 @@ class FivetranStandardAPI(FivetranAccessInterface): ) return [] + logger.info( + f"Found connector ID {connector_id} for source table {source_table}" + ) + logger.info(f"Querying API for columns of {schema_name}.{table_name}") + # Call the API to get columns using the direct columns endpoint try: columns = self.api_client.get_table_columns( connector_id, schema_name, table_name ) - logger.info(f"Retrieved {len(columns)} columns from API for {source_table}") - return columns + if columns: + logger.info( + f"Retrieved {len(columns)} columns from API for {source_table}" + ) + return columns + else: + logger.warning(f"No columns returned from API for {source_table}") except Exception as e: logger.warning(f"Failed to get columns from API for {source_table}: {e}") - return [] + + return [] def _find_connector_id_for_source_table(self, source_table: str) -> Optional[str]: """Find the connector ID for a source table.""" for conn in getattr(self, "_connector_cache", []): - if hasattr(conn, "connector_id") and source_table in str(conn.lineage): + if not hasattr(conn, "connector_id"): + continue + + # Check in lineage + for lineage in getattr(conn, "lineage", []): + if ( + hasattr(lineage, "source_table") + and lineage.source_table == source_table + ): + return conn.connector_id + + # Also check as substring in case formats don't match exactly + if source_table in str(conn.lineage): return conn.connector_id + return None def _create_column_lineage_from_columns(