This commit is contained in:
Jonny Dixon 2025-03-20 23:49:47 +00:00
parent 1d7f4acb8b
commit 68effd743a
4 changed files with 23 additions and 403 deletions

View File

@ -1220,7 +1220,7 @@ class FivetranAPIClient:
# If we're missing end time but have start time, use current time for recent jobs
if started_at and not completed_at:
if (time.time() - started_at) < (24 * 60 * 60): # Within last 24 hours
if (time.time() - started_at) < (24 * 60 * 60 * 7):
completed_at = int(time.time())
else:
# Skip old jobs without completion times
@ -1411,86 +1411,6 @@ class FivetranAPIClient:
logger.error(f"Failed to extract lineage for connector {connector_id}: {e}")
return []
def _extract_lineage_with_special_handling(
self, connector: Connector, source_platform: str
) -> List[TableLineage]:
"""
Extract lineage with special handling for certain source platforms
like Salesforce that need additional processing.
"""
connector_id = connector.connector_id
lineage_list = []
try:
schemas = self.list_connector_schemas(connector_id)
# Get destination details
destination_platform_value = connector.additional_properties.get(
"destination_platform", ""
)
# Ensure destination_platform is a string
destination_platform = (
str(destination_platform_value)
if destination_platform_value is not None
else ""
)
if not destination_platform:
destination_platform = "snowflake"
for schema in schemas:
schema_name = schema.get("name", "")
tables = schema.get("tables", [])
for table in tables:
table_name = table.get("name", "")
enabled = table.get("enabled", False)
if not enabled:
continue
# Create source table name for Salesforce
source_table = f"{schema_name}.{table_name}"
# Create destination table name based on destination platform
destination_schema = self._get_destination_schema_name(
schema_name, destination_platform
)
destination_table = self._get_destination_table_name(
table_name, destination_platform
)
destination_table_full = f"{destination_schema}.{destination_table}"
# Extract column information
columns = table.get("columns", [])
column_lineage = []
for column in columns:
column_name = column.get("name", "")
# Adjust destination column name based on platform
dest_column_name = self._get_destination_column_name(
column_name, destination_platform
)
column_lineage.append(
ColumnLineage(
source_column=column_name,
destination_column=dest_column_name,
)
)
lineage_list.append(
TableLineage(
source_table=source_table,
destination_table=destination_table_full,
column_lineage=column_lineage,
)
)
return lineage_list
except Exception as e:
logger.error(f"Failed to extract special lineage for {connector_id}: {e}")
return []
def _get_destination_schema_name(
self, schema_name: str, destination_platform: str
) -> str:
@ -1537,66 +1457,3 @@ class FivetranAPIClient:
else:
# For other platforms like Snowflake, typically uppercase
return column_name.upper()
def _build_lineage_from_schemas(
self, schemas: List[Dict], connector: Connector
) -> List[TableLineage]:
"""
Build lineage information from schemas for a generic connector.
"""
destination_platform = connector.additional_properties.get(
"destination_platform", ""
)
lineage_list = []
for schema in schemas:
schema_name = schema.get("name", "")
tables = schema.get("tables", [])
for table in tables:
table_name = table.get("name", "")
enabled = table.get("enabled", False)
if not enabled:
continue
# Create source table name
source_table = f"{schema_name}.{table_name}"
# Create destination table name based on destination platform
destination_schema = self._get_destination_schema_name(
schema_name, destination_platform
)
destination_table = self._get_destination_table_name(
table_name, destination_platform
)
destination_table_full = f"{destination_schema}.{destination_table}"
# Extract column information
columns = table.get("columns", [])
column_lineage = []
for column in columns:
column_name = column.get("name", "")
# Adjust destination column name based on platform
dest_column_name = self._get_destination_column_name(
column_name, destination_platform
)
column_lineage.append(
ColumnLineage(
source_column=column_name,
destination_column=dest_column_name,
)
)
lineage_list.append(
TableLineage(
source_table=source_table,
destination_table=destination_table_full,
column_lineage=column_lineage,
)
)
return lineage_list

View File

@ -553,9 +553,28 @@ class FivetranStandardAPI(FivetranAccessInterface):
f"Table {schema_name}.{table_name} has {len(columns)} columns"
)
# DIAGNOSTIC: Print a sample of column names
column_names = [col.get("name", "unknown") for col in columns[:5]]
logger.info(f"Sample columns: {column_names}")
# DIAGNOSTIC: Print a sample of column names with more robust type checking
try:
if isinstance(columns, list) and columns:
# Get up to 5 columns, but check they're the right type first
sample_columns = columns[: min(5, len(columns))]
column_names = []
for col in sample_columns:
if isinstance(col, dict) or hasattr(col, "get"):
column_names.append(col.get("name", "unknown"))
elif isinstance(col, str):
column_names.append(col)
else:
column_names.append(f"({type(col).__name__})")
logger.info(f"Sample columns: {column_names}")
elif isinstance(columns, dict):
# Handle dictionary of columns
sample_keys = list(columns.keys())[: min(5, len(columns))]
logger.info(f"Sample column keys: {sample_keys}")
else:
logger.info(f"Columns type: {type(columns).__name__}")
except Exception as e:
logger.warning(f"Error sampling columns: {e}")
else:
logger.warning(f"Table {schema_name}.{table_name} has NO columns")
@ -811,35 +830,6 @@ class FivetranStandardAPI(FivetranAccessInterface):
return column_lineage
def _get_columns_from_sources(
self,
table: Dict,
source_table: str,
source_table_columns: Dict[str, Dict[str, str]],
) -> List[Dict]:
"""Get columns from various sources."""
# 1. First try to get columns from the table data
columns = table.get("columns", [])
# Handle different column formats
if isinstance(columns, dict):
# Convert dict format to list
columns = self._convert_column_dict_to_list(columns)
# 2. If no columns found, try to retrieve them from the schemas endpoint
if not columns:
columns = self._get_columns_from_schemas_endpoint(source_table)
# 3. If still no columns, try source_table_columns
if not columns and source_table in source_table_columns:
logger.info(f"Using columns from source_table_columns for {source_table}")
columns = [
{"name": col_name, "type": col_type}
for col_name, col_type in source_table_columns[source_table].items()
]
return columns
def _convert_column_dict_to_list(self, columns_dict: Dict) -> List[Dict]:
"""Convert column dictionary to list format."""
columns_list = []
@ -852,34 +842,6 @@ class FivetranStandardAPI(FivetranAccessInterface):
columns_list.append({"name": col_name})
return columns_list
def _get_columns_from_schemas_endpoint(self, source_table: str) -> List[Dict]:
"""Try to get columns from the schemas endpoint."""
columns: List[Dict] = []
if not hasattr(self.api_client, "get_table_columns"):
return columns
logger.info("No columns found in table data, trying schemas endpoint")
schema_name, table_name = None, None
if "." in source_table:
schema_name, table_name = source_table.split(".", 1)
if not (schema_name and table_name):
return columns
try:
connector_id = self._find_connector_id_for_source_table(source_table)
if connector_id:
columns = self.api_client.get_table_columns(
connector_id, schema_name, table_name
)
logger.info(f"Retrieved {len(columns)} columns from schemas endpoint")
except Exception as e:
logger.warning(f"Failed to get columns from schemas endpoint: {e}")
return columns
def _get_columns_from_api(self, source_table: str) -> List[Dict]:
"""Get columns directly from Fivetran API for a table."""
# Parse schema and table name
@ -941,51 +903,6 @@ class FivetranStandardAPI(FivetranAccessInterface):
return None
def _create_column_lineage_from_columns(
self,
columns: List[Dict],
source_table: str,
destination_platform: str,
) -> List[ColumnLineage]:
"""Create column lineage objects from column data."""
column_lineage = []
is_bigquery = destination_platform.lower() == "bigquery"
for column in columns:
col_name = None
if isinstance(column, dict):
col_name = column.get("name")
elif isinstance(column, str):
col_name = column
if not col_name or col_name.startswith("_fivetran"):
continue
# Get destination column name - prefer name_in_destination if available
dest_col_name = None
if isinstance(column, dict) and "name_in_destination" in column:
dest_col_name = column.get("name_in_destination")
logger.debug(
f"Using name_in_destination: {col_name} -> {dest_col_name}"
)
# If no name_in_destination, transform based on platform
if not dest_col_name:
dest_col_name = self._transform_column_name_for_platform(
col_name, is_bigquery
)
logger.debug(f"Transformed name: {col_name} -> {dest_col_name}")
# Add to lineage
column_lineage.append(
ColumnLineage(
source_column=col_name,
destination_column=dest_col_name,
)
)
return column_lineage
def _transform_column_name_for_platform(
self, column_name: str, is_bigquery: bool
) -> str:
@ -1034,67 +951,6 @@ class FivetranStandardAPI(FivetranAccessInterface):
normalized = re.sub(r"[^a-zA-Z0-9]", "", column_name).lower()
return normalized
def _find_best_fuzzy_match(
self, source_col: str, source_norm: str, dest_columns: List[Tuple[str, str]]
) -> Optional[str]:
"""Find best fuzzy match for a source column from destination columns.
Args:
source_col: Original source column name
source_norm: Normalized source column name
dest_columns: List of (original_dest, normalized_dest) tuples
Returns:
Best matching destination column name or None if no good match found
"""
import difflib
# First try to match normalized versions with high cutoff
dest_norms = [dest_norm for _, dest_norm in dest_columns]
matches = difflib.get_close_matches(source_norm, dest_norms, n=1, cutoff=0.8)
if matches:
# Find original dest column with this normalized value
matched_norm = matches[0]
for dest_col, dest_norm in dest_columns:
if dest_norm == matched_norm:
return dest_col
# If no high-quality match found, try a lower threshold on original names
# This helps with acronyms and abbreviated field names
dest_cols = [dest_col for dest_col, _ in dest_columns]
matches = difflib.get_close_matches(source_col, dest_cols, n=1, cutoff=0.6)
if matches:
return matches[0]
# Try special patterns like converting "someField" to "some_field"
snake_case = re.sub("([a-z0-9])([A-Z])", r"\1_\2", source_col).lower()
for dest_col, _ in dest_columns:
if dest_col.lower() == snake_case:
return dest_col
# If source_col contains words that are also in a destination column, consider it a match
# This helps with "BillingStreet" matching "billing_street" or "street_billing"
words = re.findall(r"[A-Z][a-z]+|[a-z]+|[0-9]+", source_col)
if words:
word_matches = {}
for dest_col, _ in dest_columns:
# Count how many words from source appear in destination
dest_words = re.findall(r"[A-Z][a-z]+|[a-z]+|[0-9]+", dest_col)
common_words = len(
set(w.lower() for w in words) & set(w.lower() for w in dest_words)
)
if common_words > 0:
word_matches[dest_col] = common_words
# If we found matches based on common words, return the one with most matches
if word_matches:
return max(word_matches.items(), key=lambda x: x[1])[0]
# No good match found
return None
def _log_column_diagnostics(self, columns: Any) -> None:
"""Log diagnostic information about column data."""
if isinstance(columns, list):

View File

@ -412,52 +412,3 @@ class FivetranSourceTest(unittest.TestCase):
# Verify source was created properly
self.assertIsInstance(source, FivetranSource)
mock_create_access.assert_called_once()
class FivetranLineageTest(unittest.TestCase):
"""Tests for lineage processing functions"""
def test_lineage_transformation(self):
"""Test column name transformations for different platforms"""
# Create API client
client = FivetranStandardAPI(MagicMock(), MagicMock())
# Test BigQuery transformation (camelCase to snake_case)
column_name = "userFirstName"
transformed = client._transform_column_name_for_platform(column_name, True)
self.assertEqual(transformed, "user_first_name")
# Test Snowflake transformation (to uppercase)
transformed = client._transform_column_name_for_platform(column_name, False)
self.assertEqual(transformed, "USERFIRSTNAME")
# Test with special characters
column_name = "user.First-Name"
transformed = client._transform_column_name_for_platform(column_name, True)
self.assertEqual(transformed, "user_first_name")
def test_fuzzy_matching(self):
"""Test fuzzy matching for column names"""
# Create API client
client = FivetranStandardAPI(MagicMock(), MagicMock())
# Test normalized exact match
source_col = "firstName"
source_norm = "firstname"
dest_columns = [("first_name", "firstname"), ("last_name", "lastname")]
match = client._find_best_fuzzy_match(source_col, source_norm, dest_columns)
self.assertEqual(match, "first_name")
# Test close match
source_col = "userEmail"
source_norm = "useremail"
dest_columns = [("user_email", "useremail"), ("email_addr", "emailaddr")]
match = client._find_best_fuzzy_match(source_col, source_norm, dest_columns)
self.assertEqual(match, "user_email")
# Test snake_case conversion match
source_col = "homeAddress"
source_norm = "homeaddress"
dest_columns = [("home_address", "homeaddress"), ("address", "address")]
match = client._find_best_fuzzy_match(source_col, source_norm, dest_columns)
self.assertEqual(match, "home_address")

View File

@ -156,50 +156,6 @@ class FivetranStandardAPITests(unittest.TestCase):
transformed = self.api._transform_column_name_for_platform(column_name, False)
self.assertEqual(transformed, "USER.FIRST-NAME")
def test_find_best_fuzzy_match(self):
"""Test fuzzy matching for column names"""
# Set up difflib mock to control behavior
with patch("difflib.get_close_matches") as mock_get_close_matches:
# For the first call, return a close match
mock_get_close_matches.return_value = ["firstname"]
# Test exact normalized match
source_col = "firstName"
source_norm = "firstname"
dest_columns = [("first_name", "firstname"), ("last_name", "lastname")]
match = self.api._find_best_fuzzy_match(
source_col, source_norm, dest_columns
)
self.assertEqual(match, "first_name")
# For the second call (no normalized match, but original match)
mock_get_close_matches.side_effect = [[], ["home_address"]]
# Test camelCase to snake_case conversion
source_col = "homeAddress"
source_norm = "homeaddress"
dest_columns = [("home_address", "homeaddress"), ("address", "address")]
match = self.api._find_best_fuzzy_match(
source_col, source_norm, dest_columns
)
self.assertEqual(match, "home_address")
# For no matches at all
mock_get_close_matches.side_effect = [[], []]
# Test no match
source_col = "uniqueField"
source_norm = "uniquefield"
dest_columns = [
("other_field", "otherfield"),
("another_field", "anotherfield"),
]
match = self.api._find_best_fuzzy_match(
source_col, source_norm, dest_columns
)
# Allow for None or a fallback default
self.assertTrue(match is None or match in ("other_field", "another_field"))
def test_normalize_column_name(self):
"""Test column name normalization"""
# Test lowercasing and removing special chars