improvements

This commit is contained in:
Jonny Dixon 2025-03-20 23:26:23 +00:00
parent dd82037994
commit 1d7f4acb8b
2 changed files with 118 additions and 50 deletions

View File

@ -163,7 +163,7 @@ class FivetranAPIClient:
self, connector_id: str, schema_name: str, table_name: str
) -> List[Dict]:
"""
Get detailed column information for a specific table by directly querying the columns endpoint.
Get detailed column information for a specific table using the direct columns endpoint.
Args:
connector_id: The Fivetran connector ID
@ -189,7 +189,7 @@ class FivetranAPIClient:
response = self._make_request("GET", url)
# Extract columns from the response
columns_data = response.get("data", {}).get("items", [])
columns_data = response.get("data", {}).get("columns", [])
logger.info(
f"Retrieved {len(columns_data)} columns directly from columns endpoint for {schema_name}.{table_name}"
@ -200,39 +200,6 @@ class FivetranAPIClient:
logger.warning(
f"Failed to get columns from direct endpoint for {schema_name}.{table_name}: {e}"
)
# Try fallback approaches if the direct endpoint fails
try:
# First try the tables endpoint
encoded_schema = urllib.parse.quote(schema_name)
url = f"/connectors/{connector_id}/schemas/{encoded_schema}/tables"
response = self._make_request("GET", url)
# Find the requested table
tables_data = response.get("data", {}).get("items", [])
for table in tables_data:
if table.get("name") == table_name:
columns = table.get("columns", [])
logger.info(
f"Found {len(columns)} columns from tables endpoint for {schema_name}.{table_name}"
)
return columns
# If still no columns, try the full schemas endpoint
schemas = self.list_connector_schemas(connector_id)
for schema in schemas:
if schema.get("name") == schema_name:
for table in schema.get("tables", []):
if table.get("name") == table_name:
columns = table.get("columns", [])
logger.info(
f"Found {len(columns)} columns from schemas endpoint for {schema_name}.{table_name}"
)
return columns
except Exception as fallback_error:
logger.warning(f"Fallback attempts also failed: {fallback_error}")
return []
def list_connector_schemas(self, connector_id: str) -> List[Dict]:

View File

@ -704,27 +704,104 @@ class FivetranStandardAPI(FivetranAccessInterface):
destination_platform: str,
source_table_columns: Dict[str, Dict[str, str]],
) -> List[ColumnLineage]:
"""Extract column lineage with enhanced retrieval and mapping."""
"""
Extract column-level lineage for a table with improved API integration.
Args:
table: Table data from API
source_table: Full source table name (schema.table)
destination_platform: Destination platform type
source_table_columns: Dict mapping table names to column information
Returns:
List of ColumnLineage objects
"""
logger.info(
f"Extracting column lineage for {source_table} to {destination_platform}"
)
logger.debug(f"Table data keys: {list(table.keys())}")
# Get columns from different sources
columns = self._get_columns_from_sources(
table, source_table, source_table_columns
# 1. First try to get columns from the table data
columns = table.get("columns", [])
# Handle different column formats
if isinstance(columns, dict):
# Convert dict format to list
columns_list = []
for col_name, col_data in columns.items():
if isinstance(col_data, dict):
col_data = col_data.copy()
col_data["name"] = col_name
columns_list.append(col_data)
else:
columns_list.append({"name": col_name})
columns = columns_list
logger.info(f"Converted dict format to list with {len(columns)} columns")
# 2. If no columns found, try to retrieve them directly from the API columns endpoint
if not columns:
logger.info(
f"No columns in table data, trying direct API query for {source_table}"
)
columns = self._get_columns_from_api(source_table)
# 3. If still no columns, try source_table_columns
if not columns and source_table in source_table_columns:
logger.info(f"Using columns from source_table_columns for {source_table}")
columns = [
{"name": col_name, "type": col_type}
for col_name, col_type in source_table_columns[source_table].items()
]
# Now create column lineage from the columns we have
column_lineage = []
is_bigquery = destination_platform.lower() == "bigquery"
# If we still don't have any columns, return empty list
if not columns:
logger.warning(f"No column information available for {source_table}")
return []
# Create column lineage
column_lineage = self._create_column_lineage_from_columns(
columns, source_table, destination_platform
for column in columns:
col_name = None
if isinstance(column, dict):
col_name = column.get("name")
elif isinstance(column, str):
col_name = column
if not col_name or col_name.startswith("_fivetran"):
continue
# Get destination column name - prefer name_in_destination if available
dest_col_name = None
if isinstance(column, dict) and "name_in_destination" in column:
dest_col_name = column.get("name_in_destination")
logger.debug(
f"Using name_in_destination: {col_name} -> {dest_col_name}"
)
# If no name_in_destination, transform based on platform
if not dest_col_name:
if is_bigquery:
# For BigQuery, convert to snake_case
dest_col_name = self._transform_column_name_for_platform(
col_name, True
)
else:
# For other platforms like Snowflake, typically uppercase
dest_col_name = self._transform_column_name_for_platform(
col_name, False
)
logger.debug(f"Transformed name: {col_name} -> {dest_col_name}")
# Add to lineage
column_lineage.append(
ColumnLineage(
source_column=col_name,
destination_column=dest_col_name,
)
)
# Log results
if column_lineage:
logger.info(
f"Created {len(column_lineage)} column lineage entries for {source_table}"
@ -822,22 +899,46 @@ class FivetranStandardAPI(FivetranAccessInterface):
)
return []
logger.info(
f"Found connector ID {connector_id} for source table {source_table}"
)
logger.info(f"Querying API for columns of {schema_name}.{table_name}")
# Call the API to get columns using the direct columns endpoint
try:
columns = self.api_client.get_table_columns(
connector_id, schema_name, table_name
)
logger.info(f"Retrieved {len(columns)} columns from API for {source_table}")
if columns:
logger.info(
f"Retrieved {len(columns)} columns from API for {source_table}"
)
return columns
else:
logger.warning(f"No columns returned from API for {source_table}")
except Exception as e:
logger.warning(f"Failed to get columns from API for {source_table}: {e}")
return []
def _find_connector_id_for_source_table(self, source_table: str) -> Optional[str]:
"""Find the connector ID for a source table."""
for conn in getattr(self, "_connector_cache", []):
if hasattr(conn, "connector_id") and source_table in str(conn.lineage):
if not hasattr(conn, "connector_id"):
continue
# Check in lineage
for lineage in getattr(conn, "lineage", []):
if (
hasattr(lineage, "source_table")
and lineage.source_table == source_table
):
return conn.connector_id
# Also check as substring in case formats don't match exactly
if source_table in str(conn.lineage):
return conn.connector_id
return None
def _create_column_lineage_from_columns(