remove unused methods

This commit is contained in:
Jonny Dixon 2025-03-20 22:45:59 +00:00
parent 23e2cb0273
commit d3516b234b
2 changed files with 124 additions and 381 deletions

View File

@ -1,7 +1,5 @@
import difflib
import logging
import re
from typing import Dict, Iterable, List, Optional, Tuple
from typing import Dict, Iterable, List, Optional
import datahub.emitter.mce_builder as builder
from datahub.api.entities.datajob import DataFlow, DataJob
@ -347,104 +345,6 @@ class FivetranSource(StatefulIngestionSourceBase):
)
return None
def _normalize_column_name(self, column_name: str) -> str:
"""Normalize column name for comparison by removing non-alphanumeric chars and converting to lowercase."""
# Remove non-alphanumeric characters and convert to lowercase
normalized = re.sub(r"[^a-zA-Z0-9]", "", column_name).lower()
return normalized
def _transform_column_name_for_platform(
self, column_name: str, is_bigquery: bool
) -> str:
"""Transform column name based on the destination platform with better handling of edge cases."""
if not column_name:
return ""
if is_bigquery:
# For BigQuery:
# 1. Convert to lowercase
# 2. Replace camelCase with snake_case
# 3. Clean up any invalid characters
import re
# Step 1: Convert camelCase to snake_case with regex
s1 = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", column_name)
s2 = re.sub("([a-z0-9])([A-Z])", r"\1_\2", s1)
# Step 2: lowercase and replace non-alphanumeric with underscore
transformed = re.sub(r"[^a-zA-Z0-9_]", "_", s2.lower())
# Step 3: Remove leading/trailing underscores and collapse multiple underscores
transformed = re.sub(r"_+", "_", transformed).strip("_")
# Log the transformation for debugging
if transformed != column_name.lower():
logger.debug(f"Transformed column: {column_name} -> {transformed}")
return transformed
else:
# For other platforms like Snowflake, typically uppercase
return column_name.upper()
def _find_best_fuzzy_match(
self, source_col: str, source_norm: str, dest_columns: List[Tuple[str, str]]
) -> Optional[str]:
"""Find best fuzzy match for a source column from destination columns.
Args:
source_col: Original source column name
source_norm: Normalized source column name
dest_columns: List of (original_dest, normalized_dest) tuples
Returns:
Best matching destination column name or None if no good match found
"""
# First try to match normalized versions with high cutoff
dest_norms = [dest_norm for _, dest_norm in dest_columns]
matches = difflib.get_close_matches(source_norm, dest_norms, n=1, cutoff=0.8)
if matches:
# Find original dest column with this normalized value
matched_norm = matches[0]
for dest_col, dest_norm in dest_columns:
if dest_norm == matched_norm:
return dest_col
# If no high-quality match found, try a lower threshold on original names
# This helps with acronyms and abbreviated field names
dest_cols = [dest_col for dest_col, _ in dest_columns]
matches = difflib.get_close_matches(source_col, dest_cols, n=1, cutoff=0.6)
if matches:
return matches[0]
# Try special patterns like converting "someField" to "some_field"
snake_case = re.sub("([a-z0-9])([A-Z])", r"\1_\2", source_col).lower()
for dest_col, _ in dest_columns:
if dest_col.lower() == snake_case:
return dest_col
# If source_col contains words that are also in a destination column, consider it a match
# This helps with "BillingStreet" matching "billing_street" or "street_billing"
words = re.findall(r"[A-Z][a-z]+|[a-z]+|[0-9]+", source_col)
if words:
word_matches = {}
for dest_col, _ in dest_columns:
# Count how many words from source appear in destination
dest_words = re.findall(r"[A-Z][a-z]+|[a-z]+|[0-9]+", dest_col)
common_words = len(
set(w.lower() for w in words) & set(w.lower() for w in dest_words)
)
if common_words > 0:
word_matches[dest_col] = common_words
# If we found matches based on common words, return the one with most matches
if word_matches:
return max(word_matches.items(), key=lambda x: x[1])[0]
# No good match found
return None
def _create_column_lineage(
self,
lineage: TableLineage,
@ -536,70 +436,6 @@ class FivetranSource(StatefulIngestionSourceBase):
f"Failed to create column lineage for {column_lineage.source_column} -> {column_lineage.destination_column}: {e}"
)
def _create_field_lineage_mcp(
self,
source_urn: DatasetUrn,
dest_urn: DatasetUrn,
lineage_field_map: Dict[str, List[str]],
) -> Optional[MetadataWorkUnit]:
"""
Create field-level lineage between datasets using MetadataChangeProposal.
Args:
source_urn: Source dataset URN
dest_urn: Destination dataset URN
lineage_field_map: Map of destination field URNs to lists of source field URNs
"""
try:
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
DatasetLineageType,
FineGrainedLineage,
FineGrainedLineageDownstreamType,
FineGrainedLineageUpstreamType,
Upstream,
UpstreamLineage,
)
# Create the upstream relationship
upstream = Upstream(
dataset=str(source_urn), type=DatasetLineageType.TRANSFORMED
)
# Create fine-grained lineages for each field mapping
fine_grained_lineages = []
for dest_field, source_fields in lineage_field_map.items():
fine_grained_lineages.append(
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=source_fields,
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[dest_field],
)
)
# Create the lineage aspect
upstream_lineage = UpstreamLineage(
upstreams=[upstream], fineGrainedLineages=fine_grained_lineages
)
# Create and emit the MCP
lineage_mcp = MetadataChangeProposalWrapper(
entityUrn=str(dest_urn),
aspect=upstream_lineage,
)
# Now create a workunit from this MCP
wu = MetadataWorkUnit(id=f"{dest_urn}-field-lineage", mcp=lineage_mcp)
# Return the workunit - it will be collected and emitted by the main process
return wu
except Exception as e:
logger.error(f"Error creating field-level lineage MCP: {e}", exc_info=True)
return None
def _build_lineage_properties(
self,
connector: Connector,

View File

@ -473,6 +473,8 @@ class FivetranStandardAPI(FivetranAccessInterface):
"""
Fill in lineage information for connectors by calling the API with enhanced diagnostics and robust error handling.
"""
self._connector_cache = connectors
for connector in connectors:
try:
logger.info(
@ -695,14 +697,128 @@ class FivetranStandardAPI(FivetranAccessInterface):
return source_columns
def _process_columns_from_list(
self, columns: List[Any], is_bigquery: bool, source_table: str
def _extract_column_lineage(
self,
table: Dict,
source_table: str,
destination_platform: str,
source_table_columns: Dict[str, Dict[str, str]],
) -> List[ColumnLineage]:
"""
Process columns from list format and create column lineage.
Extracted from _extract_column_lineage to reduce complexity.
"""
"""Extract column lineage with enhanced retrieval and mapping."""
logger.info(
f"Extracting column lineage for {source_table} to {destination_platform}"
)
# Get columns from different sources
columns = self._get_columns_from_sources(
table, source_table, source_table_columns
)
# If we still don't have any columns, return empty list
if not columns:
logger.warning(f"No column information available for {source_table}")
return []
# Create column lineage
column_lineage = self._create_column_lineage_from_columns(
columns, source_table, destination_platform
)
# Log results
if column_lineage:
logger.info(
f"Created {len(column_lineage)} column lineage entries for {source_table}"
)
else:
logger.warning(f"Failed to create any column lineage for {source_table}")
return column_lineage
def _get_columns_from_sources(
self,
table: Dict,
source_table: str,
source_table_columns: Dict[str, Dict[str, str]],
) -> List[Dict]:
"""Get columns from various sources."""
# 1. First try to get columns from the table data
columns = table.get("columns", [])
# Handle different column formats
if isinstance(columns, dict):
# Convert dict format to list
columns = self._convert_column_dict_to_list(columns)
# 2. If no columns found, try to retrieve them from the schemas endpoint
if not columns:
columns = self._get_columns_from_schemas_endpoint(source_table)
# 3. If still no columns, try source_table_columns
if not columns and source_table in source_table_columns:
logger.info(f"Using columns from source_table_columns for {source_table}")
columns = [
{"name": col_name, "type": col_type}
for col_name, col_type in source_table_columns[source_table].items()
]
return columns
def _convert_column_dict_to_list(self, columns_dict: Dict) -> List[Dict]:
"""Convert column dictionary to list format."""
columns_list = []
for col_name, col_data in columns_dict.items():
if isinstance(col_data, dict):
col_data = col_data.copy()
col_data["name"] = col_name
columns_list.append(col_data)
else:
columns_list.append({"name": col_name})
return columns_list
def _get_columns_from_schemas_endpoint(self, source_table: str) -> List[Dict]:
"""Try to get columns from the schemas endpoint."""
columns: List[Dict] = []
if not hasattr(self.api_client, "get_table_columns"):
return columns
logger.info("No columns found in table data, trying schemas endpoint")
schema_name, table_name = None, None
if "." in source_table:
schema_name, table_name = source_table.split(".", 1)
if not (schema_name and table_name):
return columns
try:
connector_id = self._find_connector_id_for_source_table(source_table)
if connector_id:
columns = self.api_client.get_table_columns(
connector_id, schema_name, table_name
)
logger.info(f"Retrieved {len(columns)} columns from schemas endpoint")
except Exception as e:
logger.warning(f"Failed to get columns from schemas endpoint: {e}")
return columns
def _find_connector_id_for_source_table(self, source_table: str) -> Optional[str]:
"""Find the connector ID for a source table."""
for conn in getattr(self, "_connector_cache", []):
if hasattr(conn, "connector_id") and source_table in str(conn.lineage):
return conn.connector_id
return None
def _create_column_lineage_from_columns(
self,
columns: List[Dict],
source_table: str,
destination_platform: str,
) -> List[ColumnLineage]:
"""Create column lineage objects from column data."""
column_lineage = []
is_bigquery = destination_platform.lower() == "bigquery"
for column in columns:
col_name = None
@ -711,11 +827,7 @@ class FivetranStandardAPI(FivetranAccessInterface):
elif isinstance(column, str):
col_name = column
if not col_name:
continue
# Skip Fivetran system columns
if col_name.startswith("_fivetran"):
if not col_name or col_name.startswith("_fivetran"):
continue
# Get destination column name - prefer name_in_destination if available
@ -741,167 +853,6 @@ class FivetranStandardAPI(FivetranAccessInterface):
)
)
logger.info(
f"Created {len(column_lineage)} column lineage entries for {source_table}"
)
return column_lineage
def _process_columns_from_source(
self, source_columns: Dict[str, str], is_bigquery: bool, source_table: str
) -> List[ColumnLineage]:
"""
Create column lineage from source column information.
Extracted from _extract_column_lineage to reduce complexity.
"""
column_lineage = []
logger.info(f"Using {len(source_columns)} columns from source table schema")
# Create lineage for each source column
for col_name in source_columns:
if col_name.startswith("_fivetran"):
continue
# Transform destination column name based on platform
dest_col_name = self._transform_column_name_for_platform(
col_name, is_bigquery
)
column_lineage.append(
ColumnLineage(
source_column=col_name,
destination_column=dest_col_name,
)
)
logger.info(f"Created {len(column_lineage)} fallback column lineage entries")
return column_lineage
def _extract_column_lineage(
self,
table: Dict,
source_table: str,
destination_platform: str,
source_table_columns: Dict[str, Dict[str, str]],
) -> List[ColumnLineage]:
"""
Extract column-level lineage for a table with improved matching techniques.
Args:
table: Table data from API
source_table: Full source table name (schema.table)
destination_platform: Destination platform type
source_table_columns: Dict mapping table names to column information
Returns:
List of ColumnLineage objects
"""
# Get detailed logging for debugging
logger.info(
f"Extracting column lineage for {source_table} to {destination_platform}"
)
logger.debug(f"Table data keys: {list(table.keys())}")
# Get columns from the API response
columns = table.get("columns", [])
# Log what we're working with
if isinstance(columns, list):
logger.info(f"Found {len(columns)} columns in list format")
elif isinstance(columns, dict):
logger.info(f"Found {len(columns)} columns in dict format")
# Convert dict format to list if needed
columns_list = []
for col_name, col_data in columns.items():
if isinstance(col_data, dict):
col_data = col_data.copy()
col_data["name"] = col_name
columns_list.append(col_data)
else:
columns_list.append({"name": col_name})
columns = columns_list
logger.info(f"Converted dict format to list with {len(columns)} columns")
else:
logger.warning(f"Columns in unexpected format: {type(columns)}")
is_bigquery = destination_platform.lower() == "bigquery"
column_lineage = []
# First try direct mapping from columns if available
if isinstance(columns, list) and columns:
for column in columns:
col_name = None
if isinstance(column, dict):
col_name = column.get("name")
elif isinstance(column, str):
col_name = column
if not col_name or col_name.startswith("_fivetran"):
continue
# Get destination column name - prefer name_in_destination if available
dest_col_name = None
if isinstance(column, dict) and "name_in_destination" in column:
dest_col_name = column.get("name_in_destination")
logger.debug(
f"Using name_in_destination: {col_name} -> {dest_col_name}"
)
# If no name_in_destination, transform based on platform
if not dest_col_name:
dest_col_name = self._transform_column_name_for_platform(
col_name, is_bigquery
)
logger.debug(f"Transformed name: {col_name} -> {dest_col_name}")
# Add to lineage
column_lineage.append(
ColumnLineage(
source_column=col_name,
destination_column=dest_col_name,
)
)
# If we got column lineage, return it
if column_lineage:
logger.info(
f"Created {len(column_lineage)} column lineage entries using direct mapping"
)
return column_lineage
# No direct column mapping, try to derive from source_table_columns
if source_table in source_table_columns:
logger.info(
f"Attempting to derive column lineage from source_table_columns for {source_table}"
)
source_cols = source_table_columns[source_table]
for col_name in source_cols:
if col_name.startswith("_fivetran"):
continue
# Transform destination column name based on platform
dest_col_name = self._transform_column_name_for_platform(
col_name, is_bigquery
)
column_lineage.append(
ColumnLineage(
source_column=col_name,
destination_column=dest_col_name,
)
)
logger.info(
f"Created {len(column_lineage)} column lineage entries using source_table_columns"
)
else:
logger.warning(f"No source_table_columns available for {source_table}")
# If we still have no lineage, warn about it
if not column_lineage:
logger.warning(f"Could not create any column lineage for {source_table}")
return column_lineage
def _transform_column_name_for_platform(
@ -1028,50 +979,6 @@ class FivetranStandardAPI(FivetranAccessInterface):
else:
logger.warning(f"Columns in unexpected format: {type(columns)}")
def _convert_column_dict_to_list(self, columns_dict: Dict) -> List[Dict]:
"""Convert column dictionary to list format for consistent processing."""
columns_list = []
for col_name, col_data in columns_dict.items():
if isinstance(col_data, dict):
col_data = col_data.copy()
col_data["name"] = col_name
columns_list.append(col_data)
else:
columns_list.append({"name": col_name})
logger.info(f"Converted dict format to list with {len(columns_list)} columns")
return columns_list
def _try_get_column_mapping_from_config(
self, source_table: str
) -> List[ColumnLineage]:
"""Try to get column mapping from configuration if available."""
column_lineage = []
# Check if there's a config attribute with column mapping info
if hasattr(self, "config") and self.config:
# Check for any attribute that might have column mappings
for attr_name in dir(self.config):
if "column_mapping" in attr_name.lower() and hasattr(
self.config, attr_name
):
mapping_attr = getattr(self.config, attr_name)
if isinstance(mapping_attr, dict) and source_table in mapping_attr:
column_mapping = mapping_attr.get(source_table, {})
logger.info(
f"Found config column mapping for {source_table}: {len(column_mapping)} columns"
)
for source_col, dest_col in column_mapping.items():
column_lineage.append(
ColumnLineage(
source_column=source_col,
destination_column=dest_col,
)
)
return column_lineage
def _get_destination_platform(self, connector: Connector) -> str:
"""
Determine the destination platform based on the configuration and connector details.