From 281b6df87568062c8d1d8e6b327d9e1d4236cbfd Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Mon, 24 Mar 2025 16:07:31 +0000 Subject: [PATCH] cll improvements --- .../ingestion/source/fivetran/fivetran.py | 291 ++++++++++- .../source/fivetran/fivetran_standard_api.py | 472 +++++++++++++++--- 2 files changed, 697 insertions(+), 66 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py index 7105dda25c..e1a1bec34e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py @@ -1,5 +1,6 @@ import logging -from typing import Dict, Iterable, List, Optional +import re +from typing import Any, Dict, Iterable, List, Optional import datahub.emitter.mce_builder as builder from datahub.api.entities.datajob import DataFlow, DataJob @@ -25,7 +26,12 @@ from datahub.ingestion.source.fivetran.config import ( FivetranSourceReport, PlatformDetail, ) -from datahub.ingestion.source.fivetran.data_classes import Connector, Job, TableLineage +from datahub.ingestion.source.fivetran.data_classes import ( + ColumnLineage, + Connector, + Job, + TableLineage, +) from datahub.ingestion.source.fivetran.fivetran_access import ( create_fivetran_access, ) @@ -1139,6 +1145,270 @@ class FivetranSource(StatefulIngestionSourceBase): dpi = self._generate_dpi_from_job(job, datajob) yield from self._get_dpi_workunits(job, dpi) + def _enhance_missing_column_lineage(self, connector: Connector) -> None: + """ + Last-resort attempt to add column lineage to a connector's tables. + This tries various methods to find column information when all else has failed. + """ + if not connector.lineage: + return + + # Keep track of tables we've enhanced + enhanced_tables = 0 + + # Process each table that doesn't have column lineage + for idx, table_lineage in enumerate(connector.lineage): + if table_lineage.column_lineage: + continue # Skip tables that already have column lineage + + # Try to enhance this specific table's column lineage + if self._enhance_single_table_column_lineage(connector, idx, table_lineage): + enhanced_tables += 1 + + if enhanced_tables > 0: + logger.info( + f"Enhanced column lineage for {enhanced_tables} tables in connector {connector.connector_id}" + ) + else: + logger.warning( + f"Could not enhance column lineage for any tables in connector {connector.connector_id}" + ) + + def _enhance_single_table_column_lineage( + self, connector: Connector, idx: int, table_lineage: TableLineage + ) -> bool: + """Helper method to enhance column lineage for a single table to reduce complexity.""" + source_table = table_lineage.source_table + destination_table = table_lineage.destination_table + + if not source_table or not destination_table or "." not in source_table: + return False + + schema_name, table_name = source_table.split(".", 1) + + # Get destination platform + destination_platform = connector.additional_properties.get( + "destination_platform", "unknown" + ) + + try: + # Check if we're using the standard API implementation + from datahub.ingestion.source.fivetran.fivetran_standard_api import ( + FivetranStandardAPI, + ) + + if isinstance(self.fivetran_access, FivetranStandardAPI): + # Use the standard API's get_columns_from_api method + columns = self.fivetran_access._get_columns_from_api( + source_table, connector.connector_id + ) + if columns: + column_lineage = self._create_columns_lineage_from_data( + columns, destination_platform + ) + if column_lineage: + connector.lineage[idx].column_lineage = column_lineage + logger.info( + f"Added {len(column_lineage)} columns to lineage for {table_lineage.source_table} -> {table_lineage.destination_table}" + ) + return True + + # Try a different approach - use the connector schemas method + # This should work with any implementation of FivetranAccessInterface + schemas = [] + try: + # We need to get the schemas from the appropriate API + # Try to use list_connector_schemas indirectly through the fivetran_access + if hasattr(self.fivetran_access, "api_client") and hasattr( + self.fivetran_access.api_client, "list_connector_schemas" + ): + schemas = self.fivetran_access.api_client.list_connector_schemas( + connector.connector_id + ) + else: + # Depending on your implementation, there might be other ways to get schemas + # For now, we'll just log and continue + logger.warning( + f"No suitable method found to get schemas for connector {connector.connector_id}" + ) + except Exception as schema_e: + logger.warning( + f"Error getting schemas for connector {connector.connector_id}: {schema_e}" + ) + + if schemas: + return self._find_and_add_column_lineage( + connector, + idx, + schemas, + schema_name, + table_name, + destination_platform, + table_lineage, + ) + except Exception as e: + logger.warning(f"Error enhancing column lineage for {source_table}: {e}") + + return False + + def _find_and_add_column_lineage( + self, + connector: Connector, + idx: int, + schemas: List[Dict], + schema_name: str, + table_name: str, + destination_platform: str, + table_lineage: TableLineage, + ) -> bool: + """Find table in schemas and add column lineage if found.""" + for schema in schemas: + if schema.get("name") != schema_name: + continue + + for table in schema.get("tables", []): + if not isinstance(table, dict) or table.get("name") != table_name: + continue + + # Found the table, try to get columns + columns = self._get_columns_for_table( + connector, schema_name, table_name, table + ) + if not columns: + continue + + # Create column lineage + column_lineage = self._create_columns_lineage_from_data( + columns, destination_platform + ) + + if column_lineage: + connector.lineage[idx].column_lineage = column_lineage + logger.info( + f"Added {len(column_lineage)} columns to lineage for {table_lineage.source_table} -> {table_lineage.destination_table}" + ) + return True + + return False + + def _get_columns_for_table( + self, connector: Connector, schema_name: str, table_name: str, table: Dict + ) -> List[Any]: + """Get columns for a table, trying multiple methods.""" + # First try columns in table data + columns = table.get("columns", []) + if columns: + return columns + + # Try direct API call as fallback - need to handle the interface correctly + try: + # Check if we're using the standard API implementation + from datahub.ingestion.source.fivetran.fivetran_standard_api import ( + FivetranStandardAPI, + ) + + if isinstance(self.fivetran_access, FivetranStandardAPI): + # Use the standard API's method to get columns + return self.fivetran_access._get_columns_from_api( + f"{schema_name}.{table_name}", connector.connector_id + ) + + # If not using standard API, try alternative approach + if hasattr(self.fivetran_access, "api_client") and hasattr( + self.fivetran_access.api_client, "get_table_columns" + ): + columns = self.fivetran_access.api_client.get_table_columns( + connector.connector_id, schema_name, table_name + ) + if columns: + return columns + except Exception as e: + logger.debug(f"Failed to get columns via API: {e}") + + return [] + + def _create_columns_lineage_from_data( + self, columns: List[Any], destination_platform: str + ) -> List[ColumnLineage]: + """Create column lineage objects from raw column data.""" + column_lineage = [] + is_bigquery = destination_platform.lower() == "bigquery" + + # Process columns based on format + if isinstance(columns, list): + for column in columns: + col_name = self._extract_column_name(column) + if not col_name or col_name.startswith("_fivetran"): + continue + + # Get destination column name + dest_col_name = self._get_destination_column_name( + column, col_name, is_bigquery + ) + + column_lineage.append( + ColumnLineage( + source_column=col_name, destination_column=dest_col_name + ) + ) + # Handle dictionary format + elif isinstance(columns, dict): + for col_name, col_data in columns.items(): + if col_name.startswith("_fivetran"): + continue + + # Get destination column name + dest_col_name = self._get_destination_column_name_from_dict( + col_data, col_name, is_bigquery + ) + + column_lineage.append( + ColumnLineage( + source_column=col_name, destination_column=dest_col_name + ) + ) + + return column_lineage + + def _extract_column_name(self, column: Any) -> Optional[str]: + """Extract column name from column data.""" + if isinstance(column, dict): + return column.get("name") + elif isinstance(column, str): + return column + return None + + def _get_destination_column_name( + self, column: Any, col_name: str, is_bigquery: bool + ) -> str: + """Get destination column name, preferring name_in_destination if available.""" + if isinstance(column, dict) and "name_in_destination" in column: + return column.get("name_in_destination") + else: + return self._transform_column_name_for_platform(col_name, is_bigquery) + + def _get_destination_column_name_from_dict( + self, col_data: Any, col_name: str, is_bigquery: bool + ) -> str: + """Get destination column name from dictionary format column data.""" + if isinstance(col_data, dict) and "name_in_destination" in col_data: + return col_data.get("name_in_destination") + else: + return self._transform_column_name_for_platform(col_name, is_bigquery) + + def _transform_column_name_for_platform( + self, col_name: str, is_bigquery: bool + ) -> str: + """Transform column name based on destination platform.""" + if is_bigquery: + # For BigQuery, convert to snake_case + s1 = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", col_name) + s2 = re.sub("([a-z0-9])([A-Z])", r"\1_\2", s1) + return s2.lower() + else: + # For other platforms like Snowflake, typically uppercase + return col_name.upper() + def _get_connector_workunits( self, connector: Connector ) -> Iterable[MetadataWorkUnit]: @@ -1153,8 +1423,25 @@ class FivetranSource(StatefulIngestionSourceBase): # Store field lineage workunits to emit after dataset workunits field_lineage_workunits = [] + # We'll only consider a connector to have valid lineage if it has table lineage + # AND at least one of those tables has column lineage + has_column_lineage = any( + table_lineage.column_lineage for table_lineage in connector.lineage + ) + # Special handling for connectors with lineage but no job history if not connector.jobs and connector.lineage: + # Check if there's any column lineage to include + if not has_column_lineage: + logger.warning( + f"Connector {connector.connector_name} (ID: {connector.connector_id}) " + f"has {len(connector.lineage)} lineage entries but no column lineage. " + "Column-level lineage information will be missing." + ) + + # Try one last attempt to add column lineage + self._enhance_missing_column_lineage(connector) + logger.info( f"Connector {connector.connector_name} (ID: {connector.connector_id}) " f"has {len(connector.lineage)} lineage entries but no job history. " diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_standard_api.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_standard_api.py index 9512539d73..e72283e8d5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_standard_api.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_standard_api.py @@ -4,7 +4,7 @@ import difflib import logging import re -from typing import Dict, List, Optional, Set, Tuple +from typing import Any, Dict, List, Optional, Set, Tuple from datahub.configuration.common import AllowDenyPattern from datahub.ingestion.source.fivetran.config import ( @@ -235,84 +235,422 @@ class FivetranStandardAPI(FivetranAccessInterface): def _fill_connectors_lineage(self, connectors: List[Connector]) -> None: """ - Fill in lineage information for connectors by calling the API with enhanced diagnostics. - Ensures every connector with schema information gets lineage. + Fill in lineage information for all connectors with special attention to column lineage. """ + # Cache connectors for later use self._connector_cache = connectors + # First process explicitly to make sure lineage is extracted for connector in connectors: self._fill_connector_lineage(connector) + # Perform second pass for column lineage enhancement + self._enhance_column_lineage_for_connectors(connectors) + + def _enhance_column_lineage_for_connectors( + self, connectors: List[Connector] + ) -> None: + """Enhance column lineage for connectors that have tables without column information.""" + logger.info("Performing secondary column lineage enhancement pass") + + for connector in connectors: + try: + if not connector.lineage: + continue + + # Find tables that need column lineage enhancement + tables_missing_columns = self._find_tables_missing_columns(connector) + if not tables_missing_columns: + continue + + logger.info( + f"Enhancing column lineage for {len(tables_missing_columns)} tables in connector {connector.connector_id}" + ) + + # Get schemas if we don't have them already + schemas = self.api_client.list_connector_schemas(connector.connector_id) + if not schemas: + continue + + # Make sure we have column info + self._preload_all_columns(connector.connector_id, schemas) + + # Process each table missing columns + self._process_tables_missing_columns( + connector, tables_missing_columns, schemas + ) + + except Exception as e: + logger.error( + f"Error enhancing column lineage for connector {connector.connector_id}: {e}", + exc_info=True, + ) + + def _find_tables_missing_columns( + self, connector: Connector + ) -> List[Tuple[int, TableLineage]]: + """Find tables in a connector that don't have column lineage.""" + tables_missing_columns = [] + for idx, table_lineage in enumerate(connector.lineage): + if not table_lineage.column_lineage: + # Track tables that need column lineage enhancement + tables_missing_columns.append((idx, table_lineage)) + return tables_missing_columns + + def _process_tables_missing_columns( + self, + connector: Connector, + tables_missing_columns: List[Tuple[int, TableLineage]], + schemas: List[Dict], + ) -> None: + """Process tables that are missing column information.""" + for idx, table_lineage in tables_missing_columns: + source_table = table_lineage.source_table + if "." not in source_table: + continue + + schema_name, table_name = source_table.split(".", 1) + + # Find this table in the schemas + self._find_and_enhance_table_columns( + connector, idx, table_lineage, schema_name, table_name, schemas + ) + + def _find_and_enhance_table_columns( + self, + connector: Connector, + idx: int, + table_lineage: TableLineage, + schema_name: str, + table_name: str, + schemas: List[Dict], + ) -> None: + """Find a table in schemas and enhance its column information.""" + for schema in schemas: + if schema.get("name") != schema_name: + continue + + for table in schema.get("tables", []): + if not isinstance(table, dict) or table.get("name") != table_name: + continue + + # Found matching table, extract column lineage + self._extract_and_set_column_lineage( + connector, idx, table_lineage, table + ) + return # Found our table, no need to continue searching + + def _extract_and_set_column_lineage( + self, connector: Connector, idx: int, table_lineage: TableLineage, table: Dict + ) -> None: + """Extract column lineage from table and set it on the lineage object.""" + # Found matching table, extract column lineage + columns = table.get("columns", []) + if not columns: + return + + # Determine destination platform + destination_platform = self._get_destination_platform(connector) + + # Extract column lineage + column_lineage = self._extract_column_lineage_from_columns( + columns, destination_platform + ) + + if column_lineage: + logger.info( + f"Added {len(column_lineage)} columns to lineage for {table_lineage.source_table}" + ) + connector.lineage[idx].column_lineage = column_lineage + + def _extract_column_lineage_from_columns( + self, columns: List[Any], destination_platform: str + ) -> List[ColumnLineage]: + """Extract column lineage from a list of columns.""" + column_lineage = [] + is_bigquery = destination_platform.lower() == "bigquery" + + if not isinstance(columns, list): + return column_lineage + + for column in columns: + col_name = None + if isinstance(column, dict): + col_name = column.get("name") + elif isinstance(column, str): + col_name = column + + if col_name and not col_name.startswith("_fivetran"): + # Get destination column name - prefer name_in_destination if available + dest_col_name = None + if isinstance(column, dict) and "name_in_destination" in column: + dest_col_name = column.get("name_in_destination") + + # If no name_in_destination, transform based on platform + if not dest_col_name: + dest_col_name = self._transform_column_name_for_platform( + col_name, is_bigquery + ) + + column_lineage.append( + ColumnLineage( + source_column=col_name, destination_column=dest_col_name + ) + ) + + return column_lineage + def _create_synthetic_lineage( self, connector: Connector, schemas: List[dict], destination_platform: str ) -> None: """Create synthetic lineage for a connector based just on schema and table information.""" lineage_list = [] + # Process each schema and its tables for schema in schemas: schema_name = schema.get("name", "") if not schema_name: continue - for table in schema.get("tables", []): - if not isinstance(table, dict): - continue - - table_name = table.get("name", "") - if not table_name or not table.get("enabled", True): - continue - - # Create source table identifier - source_table = f"{schema_name}.{table_name}" - - # Get destination names - dest_schema = self._get_destination_schema_name( - schema_name, destination_platform - ) - dest_table = self._get_destination_table_name( - table_name, destination_platform - ) - destination_table = f"{dest_schema}.{dest_table}" - - # Create synthetic column lineage if we have column info - column_lineage = [] - columns = table.get("columns", []) - - if isinstance(columns, list): - for column in columns: - col_name = None - if isinstance(column, dict): - col_name = column.get("name") - elif isinstance(column, str): - col_name = column - - if col_name and not col_name.startswith("_fivetran"): - is_bigquery = destination_platform.lower() == "bigquery" - dest_col = self._transform_column_name_for_platform( - col_name, is_bigquery - ) - column_lineage.append( - ColumnLineage( - source_column=col_name, destination_column=dest_col - ) - ) - - # Add this table's lineage - lineage_list.append( - TableLineage( - source_table=source_table, - destination_table=destination_table, - column_lineage=column_lineage, - ) - ) + lineage_entries = self._process_schema_for_synthetic_lineage( + schema, schema_name, destination_platform + ) + lineage_list.extend(lineage_entries) if lineage_list: logger.info( f"Created {len(lineage_list)} synthetic table lineage entries for connector {connector.connector_id}" ) - # Set the lineage directly on the connector instead of using _lineage_cache + # Store the lineage in the connector connector.lineage = lineage_list + def _process_schema_for_synthetic_lineage( + self, schema: Dict, schema_name: str, destination_platform: str + ) -> List[TableLineage]: + """Process a schema to create synthetic lineage entries.""" + lineage_entries = [] + + tables = schema.get("tables", []) + if not isinstance(tables, list): + return lineage_entries + + for table in tables: + if not isinstance(table, dict): + continue + + lineage_entry = self._process_table_for_synthetic_lineage( + table, schema, schema_name, destination_platform + ) + + if lineage_entry: + lineage_entries.append(lineage_entry) + + return lineage_entries + + def _process_table_for_synthetic_lineage( + self, table: Dict, schema: Dict, schema_name: str, destination_platform: str + ) -> Optional[TableLineage]: + """Process a table to create a synthetic lineage entry.""" + table_name = table.get("name", "") + if not table_name or not table.get("enabled", True): + return None + + # Create source and destination table identifiers + source_table, destination_table = self._create_source_dest_table_names( + table, schema, schema_name, table_name, destination_platform + ) + + # Create column lineage + column_lineage = self._create_synthetic_column_lineage( + table, source_table, destination_platform + ) + + # Add this table's lineage if we have table information + if source_table and destination_table: + logger.info( + f"Creating lineage: {source_table} -> {destination_table} with {len(column_lineage)} columns" + ) + return TableLineage( + source_table=source_table, + destination_table=destination_table, + column_lineage=column_lineage, + ) + + return None + + def _create_source_dest_table_names( + self, + table: Dict, + schema: Dict, + schema_name: str, + table_name: str, + destination_platform: str, + ) -> Tuple[str, str]: + """Create source and destination table names with proper casing and naming conventions.""" + # Create source table identifier + source_table = f"{schema_name}.{table_name}" + + # Get destination names - using name_in_destination if available + schema_name_in_destination = schema.get("name_in_destination") + dest_schema = ( + schema_name_in_destination + if schema_name_in_destination + else self._get_destination_schema_name(schema_name, destination_platform) + ) + + table_name_in_destination = table.get("name_in_destination") + dest_table = ( + table_name_in_destination + if table_name_in_destination + else self._get_destination_table_name(table_name, destination_platform) + ) + + destination_table = f"{dest_schema}.{dest_table}" + + return source_table, destination_table + + def _create_synthetic_column_lineage( + self, table: Dict, source_table: str, destination_platform: str + ) -> List[ColumnLineage]: + """Create synthetic column lineage for a table.""" + column_lineage = [] + + # First try with list format columns + columns = table.get("columns", []) + if isinstance(columns, list): + column_lineage = self._process_list_columns_for_lineage( + columns, destination_platform + ) + + # Try to extract columns from dict format if no columns found yet + if not column_lineage and isinstance(table.get("columns"), dict): + column_dict = table.get("columns", {}) + column_lineage = self._process_dict_columns_for_lineage( + column_dict, destination_platform + ) + + # If we still don't have columns but we know the source table, + # try to get columns from API or similar tables + if not column_lineage: + logger.info( + f"No columns found for {source_table}, attempting additional discovery" + ) + column_lineage = self._get_columns_from_additional_sources( + source_table, destination_platform + ) + + return column_lineage + + def _process_list_columns_for_lineage( + self, columns: List[Any], destination_platform: str + ) -> List[ColumnLineage]: + """Process list format columns for lineage.""" + column_lineage = [] + + for column in columns: + if not column: + continue + + col_name = None + if isinstance(column, dict): + col_name = column.get("name") + elif isinstance(column, str): + col_name = column + + if not col_name or col_name.startswith("_fivetran"): + continue + + # Get the destination column name + dest_col_name = self._get_destination_column_name( + column, col_name, destination_platform + ) + + # Add to lineage + column_lineage.append( + ColumnLineage(source_column=col_name, destination_column=dest_col_name) + ) + + return column_lineage + + def _get_destination_column_name( + self, column: Any, col_name: str, destination_platform: str + ) -> str: + """Get destination column name with proper handling of name_in_destination.""" + # First check for name_in_destination + dest_col_name = None + if isinstance(column, dict) and "name_in_destination" in column: + dest_col_name = column.get("name_in_destination") + logger.debug( + f"Using name_in_destination {dest_col_name} for column {col_name}" + ) + + # If no explicit mapping, transform based on destination platform + if not dest_col_name: + is_bigquery = destination_platform.lower() == "bigquery" + dest_col_name = self._transform_column_name_for_platform( + col_name, is_bigquery + ) + logger.debug(f"Transformed column name: {col_name} -> {dest_col_name}") + + return dest_col_name + + def _process_dict_columns_for_lineage( + self, column_dict: Dict, destination_platform: str + ) -> List[ColumnLineage]: + """Process dictionary format columns for lineage.""" + column_lineage = [] + + logger.info("Extracting columns from dictionary format") + + for col_name, col_info in column_dict.items(): + if col_name.startswith("_fivetran"): + continue + + # Get destination column name + dest_col_name = None + if isinstance(col_info, dict) and "name_in_destination" in col_info: + dest_col_name = col_info.get("name_in_destination") + + if not dest_col_name: + is_bigquery = destination_platform.lower() == "bigquery" + dest_col_name = self._transform_column_name_for_platform( + col_name, is_bigquery + ) + + column_lineage.append( + ColumnLineage(source_column=col_name, destination_column=dest_col_name) + ) + + return column_lineage + + def _get_columns_from_additional_sources( + self, source_table: str, destination_platform: str + ) -> List[ColumnLineage]: + """Try to get columns from additional sources when normal methods fail.""" + column_lineage = [] + + # Try to infer from API + columns_from_api = self._get_columns_from_api(source_table) + if columns_from_api: + logger.info( + f"Found {len(columns_from_api)} columns from API for {source_table}" + ) + + is_bigquery = destination_platform.lower() == "bigquery" + for col in columns_from_api: + col_name = col.get("name") if isinstance(col, dict) else col + if col_name and not col_name.startswith("_fivetran"): + dest_col_name = self._transform_column_name_for_platform( + col_name, is_bigquery + ) + column_lineage.append( + ColumnLineage( + source_column=col_name, destination_column=dest_col_name + ) + ) + + return column_lineage + def _process_connector( self, api_connector: Dict, @@ -1018,8 +1356,13 @@ class FivetranStandardAPI(FivetranAccessInterface): return column_lineage - def _get_columns_from_api(self, source_table: str) -> List[Dict]: - """Get columns directly from Fivetran API for a table.""" + def _get_columns_from_api( + self, source_table: str, connector_id: str = None + ) -> List[Dict]: + """ + Get columns directly from Fivetran API for a table. + Enhanced to use provided connector_id when available. + """ # Parse schema and table name if "." not in source_table: logger.warning( @@ -1029,23 +1372,24 @@ class FivetranStandardAPI(FivetranAccessInterface): schema_name, table_name = source_table.split(".", 1) - # Find the connector ID for this source table - connector_id = self._find_connector_id_for_source_table(source_table) - if not connector_id: + # Use provided connector_id or try to find it + conn_id = connector_id + if not conn_id: + conn_id = self._find_connector_id_for_source_table(source_table) + + if not conn_id: logger.warning( f"Could not find connector ID for source table {source_table}" ) return [] - logger.info( - f"Found connector ID {connector_id} for source table {source_table}" - ) + logger.info(f"Using connector ID {conn_id} for source table {source_table}") logger.info(f"Querying API for columns of {schema_name}.{table_name}") # Call the API to get columns using the direct columns endpoint try: columns = self.api_client.get_table_columns( - connector_id, schema_name, table_name + conn_id, schema_name, table_name ) if columns: logger.info(