mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-02 11:49:23 +00:00
cll improvements
This commit is contained in:
parent
5642906be3
commit
0816bf3155
@ -356,12 +356,16 @@ class FivetranSource(StatefulIngestionSourceBase):
|
||||
def _transform_column_name_for_platform(
|
||||
self, column_name: str, is_bigquery: bool
|
||||
) -> str:
|
||||
"""Transform column name based on the destination platform."""
|
||||
"""Transform column name based on the destination platform with better handling of edge cases."""
|
||||
if not column_name:
|
||||
return ""
|
||||
|
||||
if is_bigquery:
|
||||
# For BigQuery:
|
||||
# 1. Convert to lowercase
|
||||
# 2. Replace camelCase with snake_case
|
||||
# 3. Clean up any invalid characters
|
||||
import re
|
||||
|
||||
# Step 1: Convert camelCase to snake_case with regex
|
||||
s1 = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", column_name)
|
||||
@ -373,6 +377,10 @@ class FivetranSource(StatefulIngestionSourceBase):
|
||||
# Step 3: Remove leading/trailing underscores and collapse multiple underscores
|
||||
transformed = re.sub(r"_+", "_", transformed).strip("_")
|
||||
|
||||
# Log the transformation for debugging
|
||||
if transformed != column_name.lower():
|
||||
logger.debug(f"Transformed column: {column_name} -> {transformed}")
|
||||
|
||||
return transformed
|
||||
else:
|
||||
# For other platforms like Snowflake, typically uppercase
|
||||
@ -444,135 +452,19 @@ class FivetranSource(StatefulIngestionSourceBase):
|
||||
dest_urn: Optional[DatasetUrn],
|
||||
fine_grained_lineage: List[FineGrainedLineage],
|
||||
) -> None:
|
||||
"""Create column-level lineage between source and destination tables with fuzzy matching."""
|
||||
"""Create column-level lineage between source and destination tables with better diagnostics."""
|
||||
if not source_urn or not dest_urn:
|
||||
return
|
||||
|
||||
# Log details for debugging
|
||||
logger.info(f"Creating column lineage from {source_urn} to {dest_urn}")
|
||||
|
||||
# Get destination platform
|
||||
dest_platform = str(dest_urn).split(",")[0].split(":")[-1]
|
||||
is_bigquery = dest_platform.lower() == "bigquery"
|
||||
|
||||
# If there are explicit column mappings, use them directly
|
||||
if lineage.column_lineage:
|
||||
# Extract and normalize all source and destination columns
|
||||
source_columns = []
|
||||
dest_columns = []
|
||||
original_mappings = {}
|
||||
|
||||
for column_lineage in lineage.column_lineage:
|
||||
if (
|
||||
not column_lineage.source_column
|
||||
or not column_lineage.destination_column
|
||||
or column_lineage.destination_column.startswith("_fivetran")
|
||||
):
|
||||
continue
|
||||
|
||||
source_col = column_lineage.source_column
|
||||
dest_col = column_lineage.destination_column
|
||||
|
||||
# Transform destination column based on platform
|
||||
transformed_dest = self._transform_column_name_for_platform(
|
||||
dest_col, is_bigquery
|
||||
)
|
||||
|
||||
# Store original and normalized versions
|
||||
source_norm = self._normalize_column_name(source_col)
|
||||
dest_norm = self._normalize_column_name(transformed_dest)
|
||||
|
||||
source_columns.append((source_col, source_norm))
|
||||
dest_columns.append((transformed_dest, dest_norm))
|
||||
|
||||
# Keep track of original mappings
|
||||
original_mappings[(source_col, dest_col)] = (
|
||||
source_col,
|
||||
transformed_dest,
|
||||
)
|
||||
|
||||
# Apply fuzzy matching to find best matches where needed
|
||||
best_matches = {}
|
||||
|
||||
for source_col, source_norm in source_columns:
|
||||
# First try exact match with normalized column name
|
||||
exact_match = None
|
||||
for dest_col, dest_norm in dest_columns:
|
||||
if source_norm == dest_norm:
|
||||
exact_match = dest_col
|
||||
break
|
||||
|
||||
if exact_match:
|
||||
best_matches[source_col] = exact_match
|
||||
continue
|
||||
|
||||
# If no exact match, try fuzzy matching
|
||||
best_match = self._find_best_fuzzy_match(
|
||||
source_col, source_norm, dest_columns
|
||||
)
|
||||
if best_match:
|
||||
best_matches[source_col] = best_match
|
||||
logger.info(f"Fuzzy matched: {source_col} -> {best_match}")
|
||||
|
||||
# Create lineage for each matched column
|
||||
for source_col, dest_col in best_matches.items():
|
||||
try:
|
||||
# Create field URNs
|
||||
source_field_urn = builder.make_schema_field_urn(
|
||||
str(source_urn),
|
||||
source_col,
|
||||
)
|
||||
|
||||
dest_field_urn = builder.make_schema_field_urn(
|
||||
str(dest_urn),
|
||||
dest_col,
|
||||
)
|
||||
|
||||
# Add to fine-grained lineage
|
||||
fine_grained_lineage.append(
|
||||
FineGrainedLineage(
|
||||
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
|
||||
upstreams=[source_field_urn],
|
||||
downstreamType=FineGrainedLineageDownstreamType.FIELD,
|
||||
downstreams=[dest_field_urn],
|
||||
)
|
||||
)
|
||||
|
||||
logger.debug(
|
||||
f"Added field lineage: {source_field_urn} -> {dest_field_urn}"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Failed to create column lineage for {source_col} -> {dest_col}: {e}"
|
||||
)
|
||||
|
||||
# Log the total number of lineage entries created
|
||||
if fine_grained_lineage:
|
||||
logger.info(
|
||||
f"Created {len(fine_grained_lineage)} field lineage entries for {source_urn} -> {dest_urn}"
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
f"No valid column lineage mappings found for {source_urn} -> {dest_urn}"
|
||||
)
|
||||
else:
|
||||
# No column mappings provided - log a warning
|
||||
if not lineage.column_lineage:
|
||||
logger.warning(
|
||||
f"No column lineage data available for {lineage.source_table} -> {lineage.destination_table}. "
|
||||
f"This may indicate an issue with schema retrieval from the Fivetran API."
|
||||
f"No column lineage data available for {lineage.source_table} -> {lineage.destination_table}"
|
||||
)
|
||||
|
||||
# Add a special note in the report
|
||||
self.report.warning(
|
||||
title="Missing column lineage",
|
||||
message=(
|
||||
"No column lineage information was available for some tables. "
|
||||
"This may indicate an issue with schema retrieval from the Fivetran API."
|
||||
),
|
||||
context=f"{lineage.source_table} → {lineage.destination_table}",
|
||||
)
|
||||
|
||||
# Add a placeholder entry to indicate table-level lineage only
|
||||
fine_grained_lineage.append(
|
||||
FineGrainedLineage(
|
||||
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
|
||||
@ -581,6 +473,68 @@ class FivetranSource(StatefulIngestionSourceBase):
|
||||
downstreams=[str(dest_urn)],
|
||||
)
|
||||
)
|
||||
return
|
||||
|
||||
logger.info(f"Processing {len(lineage.column_lineage)} column mappings")
|
||||
|
||||
valid_lineage = []
|
||||
for column_lineage in lineage.column_lineage:
|
||||
if (
|
||||
not column_lineage.source_column
|
||||
or not column_lineage.destination_column
|
||||
):
|
||||
continue
|
||||
|
||||
if column_lineage.destination_column.startswith("_fivetran"):
|
||||
continue
|
||||
|
||||
valid_lineage.append(column_lineage)
|
||||
|
||||
if not valid_lineage:
|
||||
logger.warning("No valid column mappings found after filtering")
|
||||
return
|
||||
|
||||
# Process valid column mappings
|
||||
for column_lineage in valid_lineage:
|
||||
try:
|
||||
# Log what we're processing
|
||||
logger.debug(
|
||||
f"Processing: {column_lineage.source_column} -> {column_lineage.destination_column}"
|
||||
)
|
||||
|
||||
# Create field URNs
|
||||
source_field_urn = builder.make_schema_field_urn(
|
||||
str(source_urn),
|
||||
column_lineage.source_column,
|
||||
)
|
||||
|
||||
# For BigQuery, ensure proper case and format
|
||||
dest_column = column_lineage.destination_column
|
||||
if is_bigquery:
|
||||
dest_column = dest_column.lower()
|
||||
|
||||
dest_field_urn = builder.make_schema_field_urn(
|
||||
str(dest_urn),
|
||||
dest_column,
|
||||
)
|
||||
|
||||
# Add to fine-grained lineage
|
||||
fine_grained_lineage.append(
|
||||
FineGrainedLineage(
|
||||
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
|
||||
upstreams=[source_field_urn],
|
||||
downstreamType=FineGrainedLineageDownstreamType.FIELD,
|
||||
downstreams=[dest_field_urn],
|
||||
)
|
||||
)
|
||||
|
||||
logger.debug(
|
||||
f"Added field lineage: {source_field_urn} -> {dest_field_urn}"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Failed to create column lineage for {column_lineage.source_column} -> {column_lineage.destination_column}: {e}"
|
||||
)
|
||||
|
||||
def _create_field_lineage_mcp(
|
||||
self,
|
||||
|
||||
@ -166,6 +166,97 @@ class FivetranAPIClient:
|
||||
)
|
||||
return connector_data
|
||||
|
||||
def get_table_columns(
|
||||
self, connector_id: str, schema_name: str, table_name: str
|
||||
) -> List[Dict]:
|
||||
"""
|
||||
Get detailed column information for a specific table using the tables API endpoint.
|
||||
This is more reliable for column information than the schemas endpoint.
|
||||
|
||||
Args:
|
||||
connector_id: The Fivetran connector ID
|
||||
schema_name: The schema name
|
||||
table_name: The table name
|
||||
|
||||
Returns:
|
||||
List of column dictionaries with name, type, and other properties
|
||||
"""
|
||||
try:
|
||||
# URL-encode the schema and table names to handle special characters
|
||||
import urllib.parse
|
||||
|
||||
encoded_schema = urllib.parse.quote(schema_name)
|
||||
encoded_table = urllib.parse.quote(table_name)
|
||||
|
||||
logger.info(f"Fetching column info directly for {schema_name}.{table_name}")
|
||||
|
||||
# Make the API request for detailed table information
|
||||
response = self._make_request(
|
||||
"GET",
|
||||
f"/connectors/{connector_id}/schemas/{encoded_schema}/tables/{encoded_table}",
|
||||
)
|
||||
|
||||
# Extract column information
|
||||
table_data = response.get("data", {})
|
||||
logger.debug(f"Table API response structure: {list(table_data.keys())}")
|
||||
|
||||
columns_data = table_data.get("columns", {})
|
||||
|
||||
# Convert column data to a list format if it's a dictionary
|
||||
columns = []
|
||||
if isinstance(columns_data, dict):
|
||||
for col_name, col_info in columns_data.items():
|
||||
if isinstance(col_info, dict):
|
||||
col_entry = (
|
||||
col_info.copy()
|
||||
) # Create a copy to avoid modifying the original
|
||||
col_entry["name"] = col_name
|
||||
|
||||
# Ensure there's an enabled field
|
||||
if "enabled" not in col_entry:
|
||||
col_entry["enabled"] = True
|
||||
|
||||
# Add the column if it's enabled
|
||||
if col_entry.get("enabled", True):
|
||||
columns.append(col_entry)
|
||||
else:
|
||||
# Simple case where we just have column names
|
||||
columns.append({"name": col_name, "enabled": True})
|
||||
elif isinstance(columns_data, list):
|
||||
columns = [col for col in columns_data if col.get("enabled", True)]
|
||||
|
||||
# Check if we have name_in_destination info
|
||||
for col in columns:
|
||||
if (
|
||||
isinstance(col, dict)
|
||||
and "name_in_destination" not in col
|
||||
and "name" in col
|
||||
):
|
||||
# Add name_in_destination based on destination platform
|
||||
destination = self.detect_destination_platform(
|
||||
table_data.get("destination_id", "")
|
||||
)
|
||||
if destination.lower() == "bigquery":
|
||||
# Convert to snake_case for BigQuery
|
||||
import re
|
||||
|
||||
name = col["name"]
|
||||
s1 = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", name)
|
||||
s2 = re.sub("([a-z0-9])([A-Z])", r"\1_\2", s1)
|
||||
col["name_in_destination"] = s2.lower()
|
||||
elif destination.lower() in ["snowflake", "redshift"]:
|
||||
# Convert to uppercase for Snowflake/Redshift
|
||||
col["name_in_destination"] = col["name"].upper()
|
||||
|
||||
logger.info(
|
||||
f"Retrieved {len(columns)} columns for {schema_name}.{table_name} via direct table API"
|
||||
)
|
||||
return columns
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to get columns for {schema_name}.{table_name}: {e}")
|
||||
return []
|
||||
|
||||
def _enrich_salesforce_connector(
|
||||
self, connector_id: str, connector_data: Dict
|
||||
) -> None:
|
||||
@ -254,12 +345,21 @@ class FivetranAPIClient:
|
||||
Normalize schema information into a consistent format regardless of API response structure.
|
||||
"""
|
||||
schemas = []
|
||||
|
||||
# Log what we're working with
|
||||
logger.debug(f"Raw schema response type: {type(raw_schemas)}")
|
||||
if isinstance(raw_schemas, dict):
|
||||
logger.debug(f"Schema keys: {list(raw_schemas.keys())}")
|
||||
elif isinstance(raw_schemas, list):
|
||||
logger.debug(f"Schema list length: {len(raw_schemas)}")
|
||||
if raw_schemas:
|
||||
logger.debug(f"First schema item type: {type(raw_schemas[0])}")
|
||||
else:
|
||||
logger.debug(f"Unexpected schema format: {str(raw_schemas)[:100]}...")
|
||||
|
||||
# Handle different response formats
|
||||
if isinstance(raw_schemas, dict):
|
||||
# Handle nested object format (older API versions)
|
||||
logger.debug(f"Schema keys: {list(raw_schemas.keys())}")
|
||||
logger.info(f"Converting nested schema format for connector {connector_id}")
|
||||
for schema_name, schema_data in raw_schemas.items():
|
||||
# Convert to the expected format
|
||||
@ -308,10 +408,6 @@ class FivetranAPIClient:
|
||||
|
||||
schemas.append(schema_obj)
|
||||
elif isinstance(raw_schemas, list):
|
||||
logger.debug(f"Schema list length: {len(raw_schemas)}")
|
||||
if raw_schemas:
|
||||
logger.debug(f"First schema item type: {type(raw_schemas[0])}")
|
||||
|
||||
# Already in the expected list format
|
||||
schemas = raw_schemas
|
||||
|
||||
@ -324,7 +420,6 @@ class FivetranAPIClient:
|
||||
if "columns" not in table:
|
||||
table["columns"] = []
|
||||
else:
|
||||
logger.debug(f"Unexpected schema format: {raw_schemas[:100]}...")
|
||||
logger.warning(
|
||||
f"Unexpected schema format type for connector {connector_id}: {type(raw_schemas)}"
|
||||
)
|
||||
@ -689,58 +784,6 @@ class FivetranAPIClient:
|
||||
f"After retrieval attempts, {tables_still_missing} tables still missing column information"
|
||||
)
|
||||
|
||||
def get_table_columns(
|
||||
self, connector_id: str, schema_name: str, table_name: str
|
||||
) -> List[Dict]:
|
||||
"""
|
||||
Get detailed column information for a specific table using the tables API endpoint.
|
||||
This is more reliable for column information than the schemas endpoint.
|
||||
"""
|
||||
try:
|
||||
# URL-encode the schema and table names to handle special characters
|
||||
import urllib.parse
|
||||
|
||||
encoded_schema = urllib.parse.quote(schema_name)
|
||||
encoded_table = urllib.parse.quote(table_name)
|
||||
|
||||
# Make the API request for detailed table information
|
||||
response = self._make_request(
|
||||
"GET",
|
||||
f"/connectors/{connector_id}/schemas/{encoded_schema}/tables/{encoded_table}",
|
||||
)
|
||||
|
||||
# Extract column information
|
||||
table_data = response.get("data", {})
|
||||
columns_data = table_data.get("columns", {})
|
||||
|
||||
# Convert column data to a list format if it's a dictionary
|
||||
columns = []
|
||||
if isinstance(columns_data, dict):
|
||||
for col_name, col_info in columns_data.items():
|
||||
if isinstance(col_info, dict):
|
||||
col_info = (
|
||||
col_info.copy()
|
||||
) # Create a copy to avoid modifying the original
|
||||
col_info["name"] = col_name
|
||||
if col_info.get(
|
||||
"enabled", True
|
||||
): # Only include enabled columns
|
||||
columns.append(col_info)
|
||||
else:
|
||||
# Simple case where we just have column names
|
||||
columns.append({"name": col_name, "enabled": True})
|
||||
elif isinstance(columns_data, list):
|
||||
columns = [col for col in columns_data if col.get("enabled", True)]
|
||||
|
||||
logger.info(
|
||||
f"Retrieved {len(columns)} columns for {schema_name}.{table_name} via direct table API"
|
||||
)
|
||||
return columns
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to get columns for {schema_name}.{table_name}: {e}")
|
||||
return []
|
||||
|
||||
def _process_column_data(self, columns: Any) -> List[Dict]:
|
||||
"""
|
||||
Process column data from various API response formats into a consistent format.
|
||||
@ -1348,20 +1391,15 @@ class FivetranAPIClient:
|
||||
Uses a generic approach that works for any connector type and properly handles name_in_destination.
|
||||
"""
|
||||
try:
|
||||
# Get the connector schemas first
|
||||
schemas = self.list_connector_schemas(connector_id)
|
||||
# Get the connector details first
|
||||
connector_details = self.get_connector(connector_id)
|
||||
|
||||
# Log more details about what we retrieved
|
||||
table_count = sum(len(schema.get("tables", [])) for schema in schemas)
|
||||
logger.info(
|
||||
f"Got {len(schemas)} schemas with {table_count} tables for connector {connector_id}"
|
||||
)
|
||||
|
||||
# Get destination information for naming
|
||||
connector = self.get_connector(connector_id)
|
||||
destination_id = connector.get("group", {}).get("id", "")
|
||||
# Get destination information
|
||||
destination_id = connector_details.get("group", {}).get("id", "")
|
||||
destination_platform = self.detect_destination_platform(destination_id)
|
||||
|
||||
# Get schema information
|
||||
schemas = self.list_connector_schemas(connector_id)
|
||||
lineage_list = []
|
||||
|
||||
# Handle cases where schemas might be a string or invalid format
|
||||
@ -1581,9 +1619,14 @@ class FivetranAPIClient:
|
||||
"""
|
||||
if destination_platform.lower() == "bigquery":
|
||||
# BigQuery column names are case-sensitive and typically lowercase
|
||||
return column_name.lower()
|
||||
# Also convert camelCase to snake_case
|
||||
import re
|
||||
|
||||
s1 = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", column_name)
|
||||
s2 = re.sub("([a-z0-9])([A-Z])", r"\1_\2", s1)
|
||||
return s2.lower()
|
||||
else:
|
||||
# For most other systems (Snowflake, Redshift, etc.), column names are uppercased
|
||||
# For other platforms like Snowflake, typically uppercase
|
||||
return column_name.upper()
|
||||
|
||||
def _build_lineage_from_schemas(
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
import logging
|
||||
from typing import Dict, List, Optional, Set, Tuple
|
||||
import re
|
||||
from typing import Any, Dict, List, Optional, Set, Tuple
|
||||
|
||||
from datahub.configuration.common import AllowDenyPattern
|
||||
from datahub.ingestion.source.fivetran.config import (
|
||||
@ -360,15 +361,117 @@ class FivetranStandardAPI(FivetranAccessInterface):
|
||||
|
||||
return destinations_seen, destination_details
|
||||
|
||||
def _process_schemas_for_lineage(
|
||||
self,
|
||||
connector: Connector,
|
||||
schemas: List[Dict],
|
||||
source_table_columns: Dict[str, Dict[str, str]],
|
||||
) -> List[TableLineage]:
|
||||
"""
|
||||
Process schemas to extract lineage information for a connector.
|
||||
This was extracted from _fill_connectors_lineage to reduce complexity.
|
||||
"""
|
||||
lineage_list = []
|
||||
destination_platform = self._get_destination_platform(connector)
|
||||
|
||||
for schema in schemas:
|
||||
try:
|
||||
schema_name = schema.get("name", "")
|
||||
if not schema_name:
|
||||
logger.warning(
|
||||
f"Skipping schema with no name in connector {connector.connector_id}"
|
||||
)
|
||||
continue
|
||||
|
||||
# Use name_in_destination if available for schema
|
||||
schema_name_in_destination = schema.get("name_in_destination")
|
||||
|
||||
tables = schema.get("tables", [])
|
||||
if not isinstance(tables, list):
|
||||
logger.warning(
|
||||
f"Schema {schema_name} has non-list tables: {type(tables)}"
|
||||
)
|
||||
continue
|
||||
|
||||
# Process each table in the schema
|
||||
for table in tables:
|
||||
try:
|
||||
if not isinstance(table, dict):
|
||||
continue
|
||||
|
||||
table_name = table.get("name", "")
|
||||
enabled = table.get("enabled", False)
|
||||
|
||||
if not enabled or not table_name:
|
||||
continue
|
||||
|
||||
# Create source table identifier
|
||||
source_table = f"{schema_name}.{table_name}"
|
||||
|
||||
# Get destination schema name - prefer name_in_destination if available
|
||||
dest_schema = None
|
||||
if schema_name_in_destination:
|
||||
dest_schema = schema_name_in_destination
|
||||
else:
|
||||
# Fall back to case transformation if name_in_destination not available
|
||||
dest_schema = self._get_destination_schema_name(
|
||||
schema_name, destination_platform
|
||||
)
|
||||
|
||||
# Get destination table name - prefer name_in_destination if available
|
||||
dest_table = None
|
||||
table_name_in_destination = table.get("name_in_destination")
|
||||
if table_name_in_destination:
|
||||
dest_table = table_name_in_destination
|
||||
logger.debug(
|
||||
f"Using provided name_in_destination '{dest_table}' for table {table_name}"
|
||||
)
|
||||
else:
|
||||
# Fall back to case transformation if name_in_destination not available
|
||||
dest_table = self._get_destination_table_name(
|
||||
table_name, destination_platform
|
||||
)
|
||||
logger.debug(
|
||||
f"No name_in_destination found for table {table_name}, using transformed name '{dest_table}'"
|
||||
)
|
||||
|
||||
# Combine to create full destination table name
|
||||
destination_table = f"{dest_schema}.{dest_table}"
|
||||
|
||||
# Process columns for lineage
|
||||
column_lineage = self._extract_column_lineage(
|
||||
table=table,
|
||||
source_table=source_table,
|
||||
destination_platform=destination_platform,
|
||||
source_table_columns=source_table_columns,
|
||||
)
|
||||
|
||||
# Add this table's lineage
|
||||
lineage_list.append(
|
||||
TableLineage(
|
||||
source_table=source_table,
|
||||
destination_table=destination_table,
|
||||
column_lineage=column_lineage,
|
||||
)
|
||||
)
|
||||
|
||||
logger.debug(
|
||||
f"Added lineage: {source_table} -> {destination_table} with {len(column_lineage)} columns"
|
||||
)
|
||||
except Exception as table_e:
|
||||
logger.warning(
|
||||
f"Error processing table {table.get('name', 'unknown')}: {table_e}"
|
||||
)
|
||||
except Exception as schema_e:
|
||||
logger.warning(
|
||||
f"Error processing schema {schema.get('name', 'unknown')}: {schema_e}"
|
||||
)
|
||||
|
||||
return lineage_list
|
||||
|
||||
def _fill_connectors_lineage(self, connectors: List[Connector]) -> None:
|
||||
"""
|
||||
Fill in lineage information for connectors by calling the API.
|
||||
|
||||
This enhanced implementation:
|
||||
1. Correctly handles API response data
|
||||
2. Uses name_in_destination when available
|
||||
3. Implements fallback column-level lineage by matching column names
|
||||
4. Handles case transformation based on destination platform
|
||||
Fill in lineage information for connectors by calling the API with enhanced diagnostics and robust error handling.
|
||||
"""
|
||||
for connector in connectors:
|
||||
try:
|
||||
@ -387,111 +490,26 @@ class FivetranStandardAPI(FivetranAccessInterface):
|
||||
|
||||
# Get schema information from API
|
||||
schemas = self.api_client.list_connector_schemas(connector.connector_id)
|
||||
logger.info(
|
||||
f"Got {len(schemas)} schemas for connector {connector.connector_id}"
|
||||
)
|
||||
|
||||
lineage_list = []
|
||||
# DIAGNOSTIC: Log detailed schema information
|
||||
self._log_schema_diagnostics(schemas)
|
||||
|
||||
# If we have no columns at all, try direct fetching for each table
|
||||
if self._should_fetch_missing_columns(schemas):
|
||||
logger.warning(
|
||||
"No columns found in initial schema fetch. Attempting direct table column fetching."
|
||||
)
|
||||
self._fetch_missing_columns(connector.connector_id, schemas)
|
||||
self._log_schema_diagnostics(schemas) # Log updated stats
|
||||
|
||||
# First, collect all source columns with their types for each table
|
||||
# This will help with generating column-level lineage
|
||||
source_table_columns = self._collect_source_columns(schemas)
|
||||
|
||||
# Process each schema
|
||||
for schema in schemas:
|
||||
try:
|
||||
schema_name = schema.get("name", "")
|
||||
if not schema_name:
|
||||
logger.warning(
|
||||
f"Skipping schema with no name in connector {connector.connector_id}"
|
||||
)
|
||||
continue
|
||||
|
||||
# Use name_in_destination if available for schema
|
||||
schema_name_in_destination = schema.get("name_in_destination")
|
||||
|
||||
tables = schema.get("tables", [])
|
||||
if not isinstance(tables, list):
|
||||
logger.warning(
|
||||
f"Schema {schema_name} has non-list tables: {type(tables)}"
|
||||
)
|
||||
continue
|
||||
|
||||
# Process each table in the schema
|
||||
for table in tables:
|
||||
try:
|
||||
if not isinstance(table, dict):
|
||||
continue
|
||||
|
||||
table_name = table.get("name", "")
|
||||
enabled = table.get("enabled", False)
|
||||
|
||||
if not enabled or not table_name:
|
||||
continue
|
||||
|
||||
# Create source table identifier
|
||||
source_table = f"{schema_name}.{table_name}"
|
||||
|
||||
# Get destination schema name - prefer name_in_destination if available
|
||||
dest_schema = None
|
||||
if schema_name_in_destination:
|
||||
dest_schema = schema_name_in_destination
|
||||
else:
|
||||
# Fall back to case transformation if name_in_destination not available
|
||||
dest_schema = self._get_destination_schema_name(
|
||||
schema_name, destination_platform
|
||||
)
|
||||
|
||||
# Get destination table name - prefer name_in_destination if available
|
||||
dest_table = None
|
||||
table_name_in_destination = table.get(
|
||||
"name_in_destination"
|
||||
)
|
||||
if table_name_in_destination:
|
||||
dest_table = table_name_in_destination
|
||||
logger.debug(
|
||||
f"Using provided name_in_destination '{dest_table}' for table {table_name}"
|
||||
)
|
||||
else:
|
||||
# Fall back to case transformation if name_in_destination not available
|
||||
dest_table = self._get_destination_table_name(
|
||||
table_name, destination_platform
|
||||
)
|
||||
logger.debug(
|
||||
f"No name_in_destination found for table {table_name}, using transformed name '{dest_table}'"
|
||||
)
|
||||
|
||||
# Combine to create full destination table name
|
||||
destination_table = f"{dest_schema}.{dest_table}"
|
||||
|
||||
# Process columns for lineage
|
||||
column_lineage = self._extract_column_lineage(
|
||||
table=table,
|
||||
source_table=source_table,
|
||||
destination_platform=destination_platform,
|
||||
source_table_columns=source_table_columns,
|
||||
)
|
||||
|
||||
# Add this table's lineage
|
||||
lineage_list.append(
|
||||
TableLineage(
|
||||
source_table=source_table,
|
||||
destination_table=destination_table,
|
||||
column_lineage=column_lineage,
|
||||
)
|
||||
)
|
||||
|
||||
logger.debug(
|
||||
f"Added lineage: {source_table} -> {destination_table} with {len(column_lineage)} columns"
|
||||
)
|
||||
except Exception as table_e:
|
||||
logger.warning(
|
||||
f"Error processing table {table.get('name', 'unknown')}: {table_e}"
|
||||
)
|
||||
except Exception as schema_e:
|
||||
logger.warning(
|
||||
f"Error processing schema {schema.get('name', 'unknown')}: {schema_e}"
|
||||
)
|
||||
# Process schemas to extract lineage information
|
||||
lineage_list = self._process_schemas_for_lineage(
|
||||
connector, schemas, source_table_columns
|
||||
)
|
||||
|
||||
# Truncate if necessary
|
||||
if len(lineage_list) > MAX_TABLE_LINEAGE_PER_CONNECTOR:
|
||||
@ -503,9 +521,8 @@ class FivetranStandardAPI(FivetranAccessInterface):
|
||||
|
||||
connector.lineage = lineage_list
|
||||
|
||||
logger.info(
|
||||
f"Successfully extracted {len(lineage_list)} table lineages for connector {connector.connector_id}"
|
||||
)
|
||||
# Final stats logging
|
||||
self._log_lineage_stats(lineage_list, connector.connector_id)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
@ -514,6 +531,92 @@ class FivetranStandardAPI(FivetranAccessInterface):
|
||||
)
|
||||
connector.lineage = []
|
||||
|
||||
def _log_schema_diagnostics(self, schemas: List[Dict]) -> None:
|
||||
"""Log diagnostic information about schemas and their columns."""
|
||||
total_columns = 0
|
||||
total_tables_with_columns = 0
|
||||
total_tables = 0
|
||||
|
||||
for schema in schemas:
|
||||
schema_name = schema.get("name", "")
|
||||
for table in schema.get("tables", []):
|
||||
total_tables += 1
|
||||
table_name = table.get("name", "")
|
||||
columns = table.get("columns", [])
|
||||
|
||||
if columns:
|
||||
total_tables_with_columns += 1
|
||||
total_columns += len(columns)
|
||||
logger.info(
|
||||
f"Table {schema_name}.{table_name} has {len(columns)} columns"
|
||||
)
|
||||
|
||||
# DIAGNOSTIC: Print a sample of column names
|
||||
column_names = [col.get("name", "unknown") for col in columns[:5]]
|
||||
logger.info(f"Sample columns: {column_names}")
|
||||
else:
|
||||
logger.warning(f"Table {schema_name}.{table_name} has NO columns")
|
||||
|
||||
logger.info(
|
||||
f"SCHEMA STATS: {total_tables_with_columns}/{total_tables} tables have columns, total {total_columns} columns"
|
||||
)
|
||||
|
||||
def _should_fetch_missing_columns(self, schemas: List[Dict]) -> bool:
|
||||
"""Determine if we need to fetch missing columns based on schema content."""
|
||||
total_columns = 0
|
||||
total_tables = 0
|
||||
|
||||
for schema in schemas:
|
||||
for table in schema.get("tables", []):
|
||||
total_tables += 1
|
||||
columns = table.get("columns", [])
|
||||
if columns:
|
||||
total_columns += len(columns)
|
||||
|
||||
return total_columns == 0 and total_tables > 0
|
||||
|
||||
def _log_lineage_stats(
|
||||
self, lineage_list: List[TableLineage], connector_id: str
|
||||
) -> None:
|
||||
"""Log statistics about lineage processing."""
|
||||
tables_with_columns = len(
|
||||
[
|
||||
table_lineage
|
||||
for table_lineage in lineage_list
|
||||
if table_lineage.column_lineage
|
||||
]
|
||||
)
|
||||
total_column_mappings = sum(
|
||||
len(table_lineage.column_lineage) for table_lineage in lineage_list
|
||||
)
|
||||
logger.info(
|
||||
f"Lineage stats for connector {connector_id}: "
|
||||
f"{len(lineage_list)} table lineages, {tables_with_columns} tables with column lineage, "
|
||||
f"{total_column_mappings} total column mappings"
|
||||
)
|
||||
|
||||
def _fetch_missing_columns(self, connector_id: str, schemas: List[Dict]) -> None:
|
||||
"""Attempt to fetch missing column information directly for each table."""
|
||||
for schema in schemas:
|
||||
schema_name = schema.get("name", "")
|
||||
for table in schema.get("tables", []):
|
||||
table_name = table.get("name", "")
|
||||
if not table.get("columns") and table.get("enabled", True):
|
||||
try:
|
||||
# Try direct column fetching
|
||||
columns = self.api_client.get_table_columns(
|
||||
connector_id, schema_name, table_name
|
||||
)
|
||||
if columns:
|
||||
table["columns"] = columns
|
||||
logger.info(
|
||||
f"Directly fetched {len(columns)} columns for {schema_name}.{table_name}"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Failed to directly fetch columns for {schema_name}.{table_name}: {e}"
|
||||
)
|
||||
|
||||
def _collect_source_columns(self, schemas: List[Dict]) -> Dict[str, Dict[str, str]]:
|
||||
"""
|
||||
Collect all source columns with their types for each table.
|
||||
@ -559,6 +662,88 @@ class FivetranStandardAPI(FivetranAccessInterface):
|
||||
|
||||
return source_columns
|
||||
|
||||
def _process_columns_from_list(
|
||||
self, columns: List[Any], is_bigquery: bool, source_table: str
|
||||
) -> List[ColumnLineage]:
|
||||
"""
|
||||
Process columns from list format and create column lineage.
|
||||
Extracted from _extract_column_lineage to reduce complexity.
|
||||
"""
|
||||
column_lineage = []
|
||||
|
||||
for column in columns:
|
||||
col_name = None
|
||||
if isinstance(column, dict):
|
||||
col_name = column.get("name")
|
||||
elif isinstance(column, str):
|
||||
col_name = column
|
||||
|
||||
if not col_name:
|
||||
continue
|
||||
|
||||
# Skip Fivetran system columns
|
||||
if col_name.startswith("_fivetran"):
|
||||
continue
|
||||
|
||||
# Get destination column name - prefer name_in_destination if available
|
||||
dest_col_name = None
|
||||
if isinstance(column, dict) and "name_in_destination" in column:
|
||||
dest_col_name = column.get("name_in_destination")
|
||||
logger.debug(
|
||||
f"Using name_in_destination: {col_name} -> {dest_col_name}"
|
||||
)
|
||||
|
||||
# If no name_in_destination, transform based on platform
|
||||
if not dest_col_name:
|
||||
dest_col_name = self._transform_column_name_for_platform(
|
||||
col_name, is_bigquery
|
||||
)
|
||||
logger.debug(f"Transformed name: {col_name} -> {dest_col_name}")
|
||||
|
||||
# Add to lineage
|
||||
column_lineage.append(
|
||||
ColumnLineage(
|
||||
source_column=col_name,
|
||||
destination_column=dest_col_name,
|
||||
)
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Created {len(column_lineage)} column lineage entries for {source_table}"
|
||||
)
|
||||
return column_lineage
|
||||
|
||||
def _process_columns_from_source(
|
||||
self, source_columns: Dict[str, str], is_bigquery: bool, source_table: str
|
||||
) -> List[ColumnLineage]:
|
||||
"""
|
||||
Create column lineage from source column information.
|
||||
Extracted from _extract_column_lineage to reduce complexity.
|
||||
"""
|
||||
column_lineage = []
|
||||
|
||||
logger.info(f"Using {len(source_columns)} columns from source table schema")
|
||||
|
||||
# Create lineage for each source column
|
||||
for col_name in source_columns:
|
||||
if col_name.startswith("_fivetran"):
|
||||
continue
|
||||
|
||||
# Transform destination column name based on platform
|
||||
dest_col_name = self._transform_column_name_for_platform(
|
||||
col_name, is_bigquery
|
||||
)
|
||||
|
||||
column_lineage.append(
|
||||
ColumnLineage(
|
||||
source_column=col_name,
|
||||
destination_column=dest_col_name,
|
||||
)
|
||||
)
|
||||
|
||||
logger.info(f"Created {len(column_lineage)} fallback column lineage entries")
|
||||
return column_lineage
|
||||
|
||||
def _extract_column_lineage(
|
||||
self,
|
||||
table: Dict,
|
||||
@ -567,7 +752,7 @@ class FivetranStandardAPI(FivetranAccessInterface):
|
||||
source_table_columns: Dict[str, Dict[str, str]],
|
||||
) -> List[ColumnLineage]:
|
||||
"""
|
||||
Extract column-level lineage for a table, with fallback to name matching if needed.
|
||||
Extract column-level lineage for a table with improved debugging and fallback.
|
||||
|
||||
Args:
|
||||
table: Table data from API
|
||||
@ -578,83 +763,110 @@ class FivetranStandardAPI(FivetranAccessInterface):
|
||||
Returns:
|
||||
List of ColumnLineage objects
|
||||
"""
|
||||
column_lineage = []
|
||||
# Get detailed logging for debugging
|
||||
logger.info(
|
||||
f"Extracting column lineage for {source_table} to {destination_platform}"
|
||||
)
|
||||
logger.debug(f"Table data keys: {list(table.keys())}")
|
||||
|
||||
# Get columns from the API response
|
||||
columns = table.get("columns", [])
|
||||
|
||||
# DIAGNOSTIC: Print details about what we got
|
||||
self._log_column_diagnostics(columns)
|
||||
|
||||
# CRITICAL FIX: Convert dict format to list if needed
|
||||
if isinstance(columns, dict):
|
||||
columns = self._convert_column_dict_to_list(columns)
|
||||
|
||||
is_bigquery = destination_platform.lower() == "bigquery"
|
||||
column_lineage = []
|
||||
|
||||
# If we have columns, create lineage mappings
|
||||
if isinstance(columns, list) and columns:
|
||||
# API provided column information, use it
|
||||
for column in columns:
|
||||
if not isinstance(column, dict):
|
||||
continue
|
||||
|
||||
col_name = column.get("name", "")
|
||||
if not col_name:
|
||||
continue
|
||||
|
||||
# Skip Fivetran system columns
|
||||
if col_name.startswith("_fivetran"):
|
||||
continue
|
||||
|
||||
# Get destination column name - prefer name_in_destination if available
|
||||
dest_col_name = None
|
||||
column_name_in_destination = column.get("name_in_destination")
|
||||
|
||||
if column_name_in_destination:
|
||||
dest_col_name = column_name_in_destination
|
||||
logger.debug(
|
||||
f"Using provided name_in_destination '{dest_col_name}' for column {col_name}"
|
||||
)
|
||||
else:
|
||||
# Fall back to case transformation if name_in_destination not available
|
||||
dest_col_name = self._get_destination_column_name(
|
||||
col_name, destination_platform
|
||||
)
|
||||
logger.debug(
|
||||
f"No name_in_destination found for column {col_name}, using transformed name '{dest_col_name}'"
|
||||
)
|
||||
|
||||
column_lineage.append(
|
||||
ColumnLineage(
|
||||
source_column=col_name,
|
||||
destination_column=dest_col_name,
|
||||
)
|
||||
)
|
||||
column_lineage = self._process_columns_from_list(
|
||||
columns, is_bigquery, source_table
|
||||
)
|
||||
else:
|
||||
# No column information from API, use source table columns if available
|
||||
# No column information from API, try other methods
|
||||
logger.warning(f"No usable column information for {source_table} from API")
|
||||
|
||||
# Use source_table_columns if available (fallback method)
|
||||
source_columns = source_table_columns.get(source_table, {})
|
||||
|
||||
if source_columns:
|
||||
logger.info(
|
||||
f"No column information from API for {source_table}. "
|
||||
f"Using source table schema with {len(source_columns)} columns"
|
||||
column_lineage = self._process_columns_from_source(
|
||||
source_columns, is_bigquery, source_table
|
||||
)
|
||||
else:
|
||||
# Try to use column mapping from config if available
|
||||
column_lineage = self._try_get_column_mapping_from_config(source_table)
|
||||
|
||||
# For each source column, create a lineage entry assuming it has the same name in destination
|
||||
# with appropriate case transformation
|
||||
for col_name in source_columns:
|
||||
# Skip Fivetran system columns
|
||||
if col_name.startswith("_fivetran"):
|
||||
continue
|
||||
|
||||
# Transform column name based on destination platform
|
||||
dest_col_name = self._get_destination_column_name(
|
||||
col_name, destination_platform
|
||||
)
|
||||
|
||||
column_lineage.append(
|
||||
ColumnLineage(
|
||||
source_column=col_name,
|
||||
destination_column=dest_col_name,
|
||||
)
|
||||
)
|
||||
|
||||
# Log whether we found column lineage
|
||||
# Final check and log
|
||||
if column_lineage:
|
||||
logger.info(f"Found {len(column_lineage)} columns for {source_table}")
|
||||
logger.info(
|
||||
f"Returning {len(column_lineage)} column lineage mappings for {source_table}"
|
||||
)
|
||||
else:
|
||||
logger.warning(f"No column lineage found for {source_table}")
|
||||
logger.warning(f"No column lineage mappings created for {source_table}")
|
||||
|
||||
return column_lineage
|
||||
|
||||
def _log_column_diagnostics(self, columns: Any) -> None:
|
||||
"""Log diagnostic information about column data."""
|
||||
if isinstance(columns, list):
|
||||
logger.info(f"Found {len(columns)} columns in list format")
|
||||
if columns:
|
||||
sample = columns[:2]
|
||||
logger.debug(f"Sample columns: {sample}")
|
||||
elif isinstance(columns, dict):
|
||||
logger.info(f"Found {len(columns)} columns in dict format")
|
||||
if columns:
|
||||
sample_keys = list(columns.keys())[:2]
|
||||
logger.debug(f"Sample column keys: {sample_keys}")
|
||||
else:
|
||||
logger.warning(f"Columns in unexpected format: {type(columns)}")
|
||||
|
||||
def _convert_column_dict_to_list(self, columns_dict: Dict) -> List[Dict]:
|
||||
"""Convert column dictionary to list format for consistent processing."""
|
||||
columns_list = []
|
||||
for col_name, col_data in columns_dict.items():
|
||||
if isinstance(col_data, dict):
|
||||
col_data = col_data.copy()
|
||||
col_data["name"] = col_name
|
||||
columns_list.append(col_data)
|
||||
else:
|
||||
columns_list.append({"name": col_name})
|
||||
|
||||
logger.info(f"Converted dict format to list with {len(columns_list)} columns")
|
||||
return columns_list
|
||||
|
||||
def _try_get_column_mapping_from_config(
|
||||
self, source_table: str
|
||||
) -> List[ColumnLineage]:
|
||||
"""Try to get column mapping from configuration if available."""
|
||||
column_lineage = []
|
||||
|
||||
# Check if there's a config attribute with column mapping info
|
||||
if hasattr(self, "config") and self.config:
|
||||
# Check for any attribute that might have column mappings
|
||||
for attr_name in dir(self.config):
|
||||
if "column_mapping" in attr_name.lower() and hasattr(
|
||||
self.config, attr_name
|
||||
):
|
||||
mapping_attr = getattr(self.config, attr_name)
|
||||
if isinstance(mapping_attr, dict) and source_table in mapping_attr:
|
||||
column_mapping = mapping_attr.get(source_table, {})
|
||||
logger.info(
|
||||
f"Found config column mapping for {source_table}: {len(column_mapping)} columns"
|
||||
)
|
||||
|
||||
for source_col, dest_col in column_mapping.items():
|
||||
column_lineage.append(
|
||||
ColumnLineage(
|
||||
source_column=source_col,
|
||||
destination_column=dest_col,
|
||||
)
|
||||
)
|
||||
|
||||
return column_lineage
|
||||
|
||||
@ -741,16 +953,36 @@ class FivetranStandardAPI(FivetranAccessInterface):
|
||||
# For most other systems (Snowflake, Redshift, etc.), table names are uppercased
|
||||
return table_name.upper()
|
||||
|
||||
def _get_destination_column_name(
|
||||
self, column_name: str, destination_platform: str
|
||||
def _transform_column_name_for_platform(
|
||||
self, column_name: str, is_bigquery: bool
|
||||
) -> str:
|
||||
"""
|
||||
Get the destination column name based on the platform.
|
||||
This is a helper method that applies appropriate case transformations.
|
||||
Transform column name based on the destination platform with better handling of edge cases.
|
||||
"""
|
||||
if destination_platform.lower() == "bigquery":
|
||||
# BigQuery column names are case-sensitive and typically lowercase
|
||||
return column_name.lower()
|
||||
if not column_name:
|
||||
return ""
|
||||
|
||||
if is_bigquery:
|
||||
# For BigQuery:
|
||||
# 1. Convert to lowercase
|
||||
# 2. Replace camelCase with snake_case
|
||||
# 3. Clean up any invalid characters
|
||||
|
||||
# Step 1: Convert camelCase to snake_case with regex
|
||||
s1 = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", column_name)
|
||||
s2 = re.sub("([a-z0-9])([A-Z])", r"\1_\2", s1)
|
||||
|
||||
# Step 2: lowercase and replace non-alphanumeric with underscore
|
||||
transformed = re.sub(r"[^a-zA-Z0-9_]", "_", s2.lower())
|
||||
|
||||
# Step 3: Remove leading/trailing underscores and collapse multiple underscores
|
||||
transformed = re.sub(r"_+", "_", transformed).strip("_")
|
||||
|
||||
# Log the transformation for debugging
|
||||
if transformed != column_name.lower():
|
||||
logger.debug(f"Transformed column: {column_name} -> {transformed}")
|
||||
|
||||
return transformed
|
||||
else:
|
||||
# For most other systems (Snowflake, Redshift, etc.), column names are uppercased
|
||||
# For other platforms like Snowflake, typically uppercase
|
||||
return column_name.upper()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user