From d3516b234b679f693182bb3361a6ab34f928c4bd Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Thu, 20 Mar 2025 22:45:59 +0000 Subject: [PATCH] remove unused methods --- .../ingestion/source/fivetran/fivetran.py | 166 +-------- .../source/fivetran/fivetran_standard_api.py | 339 +++++++----------- 2 files changed, 124 insertions(+), 381 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py index 6f0b66aa74..58d6d5bfbf 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py @@ -1,7 +1,5 @@ -import difflib import logging -import re -from typing import Dict, Iterable, List, Optional, Tuple +from typing import Dict, Iterable, List, Optional import datahub.emitter.mce_builder as builder from datahub.api.entities.datajob import DataFlow, DataJob @@ -347,104 +345,6 @@ class FivetranSource(StatefulIngestionSourceBase): ) return None - def _normalize_column_name(self, column_name: str) -> str: - """Normalize column name for comparison by removing non-alphanumeric chars and converting to lowercase.""" - # Remove non-alphanumeric characters and convert to lowercase - normalized = re.sub(r"[^a-zA-Z0-9]", "", column_name).lower() - return normalized - - def _transform_column_name_for_platform( - self, column_name: str, is_bigquery: bool - ) -> str: - """Transform column name based on the destination platform with better handling of edge cases.""" - if not column_name: - return "" - - if is_bigquery: - # For BigQuery: - # 1. Convert to lowercase - # 2. Replace camelCase with snake_case - # 3. Clean up any invalid characters - import re - - # Step 1: Convert camelCase to snake_case with regex - s1 = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", column_name) - s2 = re.sub("([a-z0-9])([A-Z])", r"\1_\2", s1) - - # Step 2: lowercase and replace non-alphanumeric with underscore - transformed = re.sub(r"[^a-zA-Z0-9_]", "_", s2.lower()) - - # Step 3: Remove leading/trailing underscores and collapse multiple underscores - transformed = re.sub(r"_+", "_", transformed).strip("_") - - # Log the transformation for debugging - if transformed != column_name.lower(): - logger.debug(f"Transformed column: {column_name} -> {transformed}") - - return transformed - else: - # For other platforms like Snowflake, typically uppercase - return column_name.upper() - - def _find_best_fuzzy_match( - self, source_col: str, source_norm: str, dest_columns: List[Tuple[str, str]] - ) -> Optional[str]: - """Find best fuzzy match for a source column from destination columns. - - Args: - source_col: Original source column name - source_norm: Normalized source column name - dest_columns: List of (original_dest, normalized_dest) tuples - - Returns: - Best matching destination column name or None if no good match found - """ - # First try to match normalized versions with high cutoff - dest_norms = [dest_norm for _, dest_norm in dest_columns] - matches = difflib.get_close_matches(source_norm, dest_norms, n=1, cutoff=0.8) - - if matches: - # Find original dest column with this normalized value - matched_norm = matches[0] - for dest_col, dest_norm in dest_columns: - if dest_norm == matched_norm: - return dest_col - - # If no high-quality match found, try a lower threshold on original names - # This helps with acronyms and abbreviated field names - dest_cols = [dest_col for dest_col, _ in dest_columns] - matches = difflib.get_close_matches(source_col, dest_cols, n=1, cutoff=0.6) - - if matches: - return matches[0] - - # Try special patterns like converting "someField" to "some_field" - snake_case = re.sub("([a-z0-9])([A-Z])", r"\1_\2", source_col).lower() - for dest_col, _ in dest_columns: - if dest_col.lower() == snake_case: - return dest_col - - # If source_col contains words that are also in a destination column, consider it a match - # This helps with "BillingStreet" matching "billing_street" or "street_billing" - words = re.findall(r"[A-Z][a-z]+|[a-z]+|[0-9]+", source_col) - if words: - word_matches = {} - for dest_col, _ in dest_columns: - # Count how many words from source appear in destination - dest_words = re.findall(r"[A-Z][a-z]+|[a-z]+|[0-9]+", dest_col) - common_words = len( - set(w.lower() for w in words) & set(w.lower() for w in dest_words) - ) - if common_words > 0: - word_matches[dest_col] = common_words - - # If we found matches based on common words, return the one with most matches - if word_matches: - return max(word_matches.items(), key=lambda x: x[1])[0] - - # No good match found - return None - def _create_column_lineage( self, lineage: TableLineage, @@ -536,70 +436,6 @@ class FivetranSource(StatefulIngestionSourceBase): f"Failed to create column lineage for {column_lineage.source_column} -> {column_lineage.destination_column}: {e}" ) - def _create_field_lineage_mcp( - self, - source_urn: DatasetUrn, - dest_urn: DatasetUrn, - lineage_field_map: Dict[str, List[str]], - ) -> Optional[MetadataWorkUnit]: - """ - Create field-level lineage between datasets using MetadataChangeProposal. - - Args: - source_urn: Source dataset URN - dest_urn: Destination dataset URN - lineage_field_map: Map of destination field URNs to lists of source field URNs - """ - try: - from datahub.emitter.mcp import MetadataChangeProposalWrapper - from datahub.metadata.com.linkedin.pegasus2avro.dataset import ( - DatasetLineageType, - FineGrainedLineage, - FineGrainedLineageDownstreamType, - FineGrainedLineageUpstreamType, - Upstream, - UpstreamLineage, - ) - - # Create the upstream relationship - upstream = Upstream( - dataset=str(source_urn), type=DatasetLineageType.TRANSFORMED - ) - - # Create fine-grained lineages for each field mapping - fine_grained_lineages = [] - - for dest_field, source_fields in lineage_field_map.items(): - fine_grained_lineages.append( - FineGrainedLineage( - upstreamType=FineGrainedLineageUpstreamType.FIELD_SET, - upstreams=source_fields, - downstreamType=FineGrainedLineageDownstreamType.FIELD, - downstreams=[dest_field], - ) - ) - - # Create the lineage aspect - upstream_lineage = UpstreamLineage( - upstreams=[upstream], fineGrainedLineages=fine_grained_lineages - ) - - # Create and emit the MCP - lineage_mcp = MetadataChangeProposalWrapper( - entityUrn=str(dest_urn), - aspect=upstream_lineage, - ) - - # Now create a workunit from this MCP - wu = MetadataWorkUnit(id=f"{dest_urn}-field-lineage", mcp=lineage_mcp) - - # Return the workunit - it will be collected and emitted by the main process - return wu - - except Exception as e: - logger.error(f"Error creating field-level lineage MCP: {e}", exc_info=True) - return None - def _build_lineage_properties( self, connector: Connector, diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_standard_api.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_standard_api.py index d70eb84f22..b8cfd6099a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_standard_api.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_standard_api.py @@ -473,6 +473,8 @@ class FivetranStandardAPI(FivetranAccessInterface): """ Fill in lineage information for connectors by calling the API with enhanced diagnostics and robust error handling. """ + self._connector_cache = connectors + for connector in connectors: try: logger.info( @@ -695,14 +697,128 @@ class FivetranStandardAPI(FivetranAccessInterface): return source_columns - def _process_columns_from_list( - self, columns: List[Any], is_bigquery: bool, source_table: str + def _extract_column_lineage( + self, + table: Dict, + source_table: str, + destination_platform: str, + source_table_columns: Dict[str, Dict[str, str]], ) -> List[ColumnLineage]: - """ - Process columns from list format and create column lineage. - Extracted from _extract_column_lineage to reduce complexity. - """ + """Extract column lineage with enhanced retrieval and mapping.""" + logger.info( + f"Extracting column lineage for {source_table} to {destination_platform}" + ) + + # Get columns from different sources + columns = self._get_columns_from_sources( + table, source_table, source_table_columns + ) + + # If we still don't have any columns, return empty list + if not columns: + logger.warning(f"No column information available for {source_table}") + return [] + + # Create column lineage + column_lineage = self._create_column_lineage_from_columns( + columns, source_table, destination_platform + ) + + # Log results + if column_lineage: + logger.info( + f"Created {len(column_lineage)} column lineage entries for {source_table}" + ) + else: + logger.warning(f"Failed to create any column lineage for {source_table}") + + return column_lineage + + def _get_columns_from_sources( + self, + table: Dict, + source_table: str, + source_table_columns: Dict[str, Dict[str, str]], + ) -> List[Dict]: + """Get columns from various sources.""" + # 1. First try to get columns from the table data + columns = table.get("columns", []) + + # Handle different column formats + if isinstance(columns, dict): + # Convert dict format to list + columns = self._convert_column_dict_to_list(columns) + + # 2. If no columns found, try to retrieve them from the schemas endpoint + if not columns: + columns = self._get_columns_from_schemas_endpoint(source_table) + + # 3. If still no columns, try source_table_columns + if not columns and source_table in source_table_columns: + logger.info(f"Using columns from source_table_columns for {source_table}") + columns = [ + {"name": col_name, "type": col_type} + for col_name, col_type in source_table_columns[source_table].items() + ] + + return columns + + def _convert_column_dict_to_list(self, columns_dict: Dict) -> List[Dict]: + """Convert column dictionary to list format.""" + columns_list = [] + for col_name, col_data in columns_dict.items(): + if isinstance(col_data, dict): + col_data = col_data.copy() + col_data["name"] = col_name + columns_list.append(col_data) + else: + columns_list.append({"name": col_name}) + return columns_list + + def _get_columns_from_schemas_endpoint(self, source_table: str) -> List[Dict]: + """Try to get columns from the schemas endpoint.""" + columns: List[Dict] = [] + + if not hasattr(self.api_client, "get_table_columns"): + return columns + + logger.info("No columns found in table data, trying schemas endpoint") + schema_name, table_name = None, None + if "." in source_table: + schema_name, table_name = source_table.split(".", 1) + + if not (schema_name and table_name): + return columns + + try: + connector_id = self._find_connector_id_for_source_table(source_table) + + if connector_id: + columns = self.api_client.get_table_columns( + connector_id, schema_name, table_name + ) + logger.info(f"Retrieved {len(columns)} columns from schemas endpoint") + except Exception as e: + logger.warning(f"Failed to get columns from schemas endpoint: {e}") + + return columns + + def _find_connector_id_for_source_table(self, source_table: str) -> Optional[str]: + """Find the connector ID for a source table.""" + for conn in getattr(self, "_connector_cache", []): + if hasattr(conn, "connector_id") and source_table in str(conn.lineage): + return conn.connector_id + return None + + def _create_column_lineage_from_columns( + self, + columns: List[Dict], + source_table: str, + destination_platform: str, + ) -> List[ColumnLineage]: + """Create column lineage objects from column data.""" column_lineage = [] + is_bigquery = destination_platform.lower() == "bigquery" for column in columns: col_name = None @@ -711,11 +827,7 @@ class FivetranStandardAPI(FivetranAccessInterface): elif isinstance(column, str): col_name = column - if not col_name: - continue - - # Skip Fivetran system columns - if col_name.startswith("_fivetran"): + if not col_name or col_name.startswith("_fivetran"): continue # Get destination column name - prefer name_in_destination if available @@ -741,167 +853,6 @@ class FivetranStandardAPI(FivetranAccessInterface): ) ) - logger.info( - f"Created {len(column_lineage)} column lineage entries for {source_table}" - ) - return column_lineage - - def _process_columns_from_source( - self, source_columns: Dict[str, str], is_bigquery: bool, source_table: str - ) -> List[ColumnLineage]: - """ - Create column lineage from source column information. - Extracted from _extract_column_lineage to reduce complexity. - """ - column_lineage = [] - - logger.info(f"Using {len(source_columns)} columns from source table schema") - - # Create lineage for each source column - for col_name in source_columns: - if col_name.startswith("_fivetran"): - continue - - # Transform destination column name based on platform - dest_col_name = self._transform_column_name_for_platform( - col_name, is_bigquery - ) - - column_lineage.append( - ColumnLineage( - source_column=col_name, - destination_column=dest_col_name, - ) - ) - - logger.info(f"Created {len(column_lineage)} fallback column lineage entries") - return column_lineage - - def _extract_column_lineage( - self, - table: Dict, - source_table: str, - destination_platform: str, - source_table_columns: Dict[str, Dict[str, str]], - ) -> List[ColumnLineage]: - """ - Extract column-level lineage for a table with improved matching techniques. - - Args: - table: Table data from API - source_table: Full source table name (schema.table) - destination_platform: Destination platform type - source_table_columns: Dict mapping table names to column information - - Returns: - List of ColumnLineage objects - """ - # Get detailed logging for debugging - logger.info( - f"Extracting column lineage for {source_table} to {destination_platform}" - ) - logger.debug(f"Table data keys: {list(table.keys())}") - - # Get columns from the API response - columns = table.get("columns", []) - - # Log what we're working with - if isinstance(columns, list): - logger.info(f"Found {len(columns)} columns in list format") - elif isinstance(columns, dict): - logger.info(f"Found {len(columns)} columns in dict format") - # Convert dict format to list if needed - columns_list = [] - for col_name, col_data in columns.items(): - if isinstance(col_data, dict): - col_data = col_data.copy() - col_data["name"] = col_name - columns_list.append(col_data) - else: - columns_list.append({"name": col_name}) - columns = columns_list - logger.info(f"Converted dict format to list with {len(columns)} columns") - else: - logger.warning(f"Columns in unexpected format: {type(columns)}") - - is_bigquery = destination_platform.lower() == "bigquery" - column_lineage = [] - - # First try direct mapping from columns if available - if isinstance(columns, list) and columns: - for column in columns: - col_name = None - if isinstance(column, dict): - col_name = column.get("name") - elif isinstance(column, str): - col_name = column - - if not col_name or col_name.startswith("_fivetran"): - continue - - # Get destination column name - prefer name_in_destination if available - dest_col_name = None - if isinstance(column, dict) and "name_in_destination" in column: - dest_col_name = column.get("name_in_destination") - logger.debug( - f"Using name_in_destination: {col_name} -> {dest_col_name}" - ) - - # If no name_in_destination, transform based on platform - if not dest_col_name: - dest_col_name = self._transform_column_name_for_platform( - col_name, is_bigquery - ) - logger.debug(f"Transformed name: {col_name} -> {dest_col_name}") - - # Add to lineage - column_lineage.append( - ColumnLineage( - source_column=col_name, - destination_column=dest_col_name, - ) - ) - - # If we got column lineage, return it - if column_lineage: - logger.info( - f"Created {len(column_lineage)} column lineage entries using direct mapping" - ) - return column_lineage - - # No direct column mapping, try to derive from source_table_columns - if source_table in source_table_columns: - logger.info( - f"Attempting to derive column lineage from source_table_columns for {source_table}" - ) - source_cols = source_table_columns[source_table] - - for col_name in source_cols: - if col_name.startswith("_fivetran"): - continue - - # Transform destination column name based on platform - dest_col_name = self._transform_column_name_for_platform( - col_name, is_bigquery - ) - - column_lineage.append( - ColumnLineage( - source_column=col_name, - destination_column=dest_col_name, - ) - ) - - logger.info( - f"Created {len(column_lineage)} column lineage entries using source_table_columns" - ) - else: - logger.warning(f"No source_table_columns available for {source_table}") - - # If we still have no lineage, warn about it - if not column_lineage: - logger.warning(f"Could not create any column lineage for {source_table}") - return column_lineage def _transform_column_name_for_platform( @@ -1028,50 +979,6 @@ class FivetranStandardAPI(FivetranAccessInterface): else: logger.warning(f"Columns in unexpected format: {type(columns)}") - def _convert_column_dict_to_list(self, columns_dict: Dict) -> List[Dict]: - """Convert column dictionary to list format for consistent processing.""" - columns_list = [] - for col_name, col_data in columns_dict.items(): - if isinstance(col_data, dict): - col_data = col_data.copy() - col_data["name"] = col_name - columns_list.append(col_data) - else: - columns_list.append({"name": col_name}) - - logger.info(f"Converted dict format to list with {len(columns_list)} columns") - return columns_list - - def _try_get_column_mapping_from_config( - self, source_table: str - ) -> List[ColumnLineage]: - """Try to get column mapping from configuration if available.""" - column_lineage = [] - - # Check if there's a config attribute with column mapping info - if hasattr(self, "config") and self.config: - # Check for any attribute that might have column mappings - for attr_name in dir(self.config): - if "column_mapping" in attr_name.lower() and hasattr( - self.config, attr_name - ): - mapping_attr = getattr(self.config, attr_name) - if isinstance(mapping_attr, dict) and source_table in mapping_attr: - column_mapping = mapping_attr.get(source_table, {}) - logger.info( - f"Found config column mapping for {source_table}: {len(column_mapping)} columns" - ) - - for source_col, dest_col in column_mapping.items(): - column_lineage.append( - ColumnLineage( - source_column=source_col, - destination_column=dest_col, - ) - ) - - return column_lineage - def _get_destination_platform(self, connector: Connector) -> str: """ Determine the destination platform based on the configuration and connector details.