mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-03 12:16:10 +00:00
cll update
This commit is contained in:
parent
fed49ac79e
commit
5642906be3
@ -254,10 +254,12 @@ class FivetranAPIClient:
|
||||
Normalize schema information into a consistent format regardless of API response structure.
|
||||
"""
|
||||
schemas = []
|
||||
logger.debug(f"Raw schema response type: {type(raw_schemas)}")
|
||||
|
||||
# Handle different response formats
|
||||
if isinstance(raw_schemas, dict):
|
||||
# Handle nested object format (older API versions)
|
||||
logger.debug(f"Schema keys: {list(raw_schemas.keys())}")
|
||||
logger.info(f"Converting nested schema format for connector {connector_id}")
|
||||
for schema_name, schema_data in raw_schemas.items():
|
||||
# Convert to the expected format
|
||||
@ -306,6 +308,10 @@ class FivetranAPIClient:
|
||||
|
||||
schemas.append(schema_obj)
|
||||
elif isinstance(raw_schemas, list):
|
||||
logger.debug(f"Schema list length: {len(raw_schemas)}")
|
||||
if raw_schemas:
|
||||
logger.debug(f"First schema item type: {type(raw_schemas[0])}")
|
||||
|
||||
# Already in the expected list format
|
||||
schemas = raw_schemas
|
||||
|
||||
@ -318,6 +324,7 @@ class FivetranAPIClient:
|
||||
if "columns" not in table:
|
||||
table["columns"] = []
|
||||
else:
|
||||
logger.debug(f"Unexpected schema format: {raw_schemas[:100]}...")
|
||||
logger.warning(
|
||||
f"Unexpected schema format type for connector {connector_id}: {type(raw_schemas)}"
|
||||
)
|
||||
@ -562,61 +569,177 @@ class FivetranAPIClient:
|
||||
) -> None:
|
||||
"""
|
||||
Ensure we have column information for tables by fetching additional details if needed.
|
||||
Uses multiple strategies to get complete column information:
|
||||
1. Check existing schema data first
|
||||
2. Try dedicated table API endpoint for tables missing columns
|
||||
3. Attempt to infer columns from metadata if available
|
||||
"""
|
||||
tables_missing_columns = []
|
||||
tables_with_columns = 0
|
||||
total_tables = 0
|
||||
|
||||
# Check if we have tables without column information
|
||||
for schema in schemas:
|
||||
schema_name = schema.get("name", "")
|
||||
for table in schema.get("tables", []):
|
||||
if not table.get("columns") and table.get("enabled", True):
|
||||
total_tables += 1
|
||||
table_name = table.get("name", "")
|
||||
|
||||
# Skip tables that aren't enabled
|
||||
if not table.get("enabled", True):
|
||||
continue
|
||||
|
||||
# Check if table has column information
|
||||
columns = table.get("columns", [])
|
||||
if not columns:
|
||||
# Add to list of tables needing column info
|
||||
tables_missing_columns.append(
|
||||
{"schema": schema["name"], "table": table["name"]}
|
||||
{
|
||||
"schema": schema_name,
|
||||
"table": table_name,
|
||||
"table_obj": table, # Keep reference to the table object for updates
|
||||
}
|
||||
)
|
||||
else:
|
||||
tables_with_columns += 1
|
||||
|
||||
# Log statistics about column availability
|
||||
logger.info(
|
||||
f"Column information stats for connector {connector_id}: "
|
||||
f"{tables_with_columns} tables have columns, "
|
||||
f"{len(tables_missing_columns)} tables missing columns, "
|
||||
f"out of {total_tables} total tables"
|
||||
)
|
||||
|
||||
if not tables_missing_columns:
|
||||
return
|
||||
|
||||
# Limit the number of API calls to avoid rate limiting
|
||||
tables_to_process = tables_missing_columns[:10]
|
||||
logger.info(
|
||||
f"Found {len(tables_missing_columns)} tables without column information for connector {connector_id}"
|
||||
f"Fetching column information for {len(tables_to_process)} tables out of {len(tables_missing_columns)} missing column info"
|
||||
)
|
||||
|
||||
# Try to fetch column information for these tables
|
||||
for table_info in tables_missing_columns[
|
||||
:10
|
||||
]: # Limit to avoid too many API calls
|
||||
for table_info in tables_to_process:
|
||||
schema_name = table_info["schema"]
|
||||
table_name = table_info["table"]
|
||||
table_obj = table_info["table_obj"]
|
||||
|
||||
try:
|
||||
# Try to get column information from table metadata endpoint
|
||||
table_path = f"/connectors/{connector_id}/schemas/{schema_name}/tables/{table_name}"
|
||||
try:
|
||||
table_response = self._make_request("GET", table_path)
|
||||
table_data = table_response.get("data", {})
|
||||
# Get columns using the dedicated table API endpoint
|
||||
columns = self.get_table_columns(connector_id, schema_name, table_name)
|
||||
|
||||
if "columns" in table_data:
|
||||
columns = table_data.get("columns", [])
|
||||
# Update our schema information with these columns
|
||||
for schema in schemas:
|
||||
if schema["name"] == schema_name:
|
||||
for table in schema["tables"]:
|
||||
if table["name"] == table_name:
|
||||
table["columns"] = self._process_column_data(
|
||||
columns
|
||||
)
|
||||
logger.info(
|
||||
f"Added {len(table['columns'])} columns to {schema_name}.{table_name}"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug(
|
||||
f"Could not get details for table {schema_name}.{table_name}: {e}"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Error ensuring column information for {schema_name}.{table_name}: {e}"
|
||||
if columns:
|
||||
# Update the table object directly with these columns
|
||||
table_obj["columns"] = columns
|
||||
logger.info(
|
||||
f"Updated {schema_name}.{table_name} with {len(columns)} columns from table API"
|
||||
)
|
||||
else:
|
||||
# If API doesn't return columns, try to infer from metadata
|
||||
logger.warning(
|
||||
f"Could not get columns for {schema_name}.{table_name} from API, attempting fallback methods"
|
||||
)
|
||||
|
||||
# Try getting metadata that might have column information
|
||||
try:
|
||||
metadata_path = f"/connectors/{connector_id}/metadata"
|
||||
metadata_response = self._make_request("GET", metadata_path)
|
||||
metadata = metadata_response.get("data", {})
|
||||
|
||||
# Look for column information in metadata
|
||||
source_objects = metadata.get("source_objects", [])
|
||||
for obj in source_objects:
|
||||
if (
|
||||
isinstance(obj, dict)
|
||||
and obj.get("name") == table_name
|
||||
and obj.get("schema") == schema_name
|
||||
):
|
||||
metadata_columns = obj.get("columns", [])
|
||||
if metadata_columns:
|
||||
# Convert to our expected format
|
||||
formatted_columns = []
|
||||
for col in metadata_columns:
|
||||
if isinstance(col, dict) and "name" in col:
|
||||
formatted_columns.append(
|
||||
{
|
||||
"name": col["name"],
|
||||
"type": col.get("type", ""),
|
||||
"enabled": True,
|
||||
}
|
||||
)
|
||||
|
||||
if formatted_columns:
|
||||
table_obj["columns"] = formatted_columns
|
||||
logger.info(
|
||||
f"Inferred {len(formatted_columns)} columns for {schema_name}.{table_name} from metadata"
|
||||
)
|
||||
break
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to get metadata for {connector_id}: {e}")
|
||||
|
||||
# Count how many tables we still don't have column info for
|
||||
tables_still_missing = 0
|
||||
for schema in schemas:
|
||||
for table in schema.get("tables", []):
|
||||
if table.get("enabled", True) and not table.get("columns"):
|
||||
tables_still_missing += 1
|
||||
|
||||
logger.info(
|
||||
f"After retrieval attempts, {tables_still_missing} tables still missing column information"
|
||||
)
|
||||
|
||||
def get_table_columns(
|
||||
self, connector_id: str, schema_name: str, table_name: str
|
||||
) -> List[Dict]:
|
||||
"""
|
||||
Get detailed column information for a specific table using the tables API endpoint.
|
||||
This is more reliable for column information than the schemas endpoint.
|
||||
"""
|
||||
try:
|
||||
# URL-encode the schema and table names to handle special characters
|
||||
import urllib.parse
|
||||
|
||||
encoded_schema = urllib.parse.quote(schema_name)
|
||||
encoded_table = urllib.parse.quote(table_name)
|
||||
|
||||
# Make the API request for detailed table information
|
||||
response = self._make_request(
|
||||
"GET",
|
||||
f"/connectors/{connector_id}/schemas/{encoded_schema}/tables/{encoded_table}",
|
||||
)
|
||||
|
||||
# Extract column information
|
||||
table_data = response.get("data", {})
|
||||
columns_data = table_data.get("columns", {})
|
||||
|
||||
# Convert column data to a list format if it's a dictionary
|
||||
columns = []
|
||||
if isinstance(columns_data, dict):
|
||||
for col_name, col_info in columns_data.items():
|
||||
if isinstance(col_info, dict):
|
||||
col_info = (
|
||||
col_info.copy()
|
||||
) # Create a copy to avoid modifying the original
|
||||
col_info["name"] = col_name
|
||||
if col_info.get(
|
||||
"enabled", True
|
||||
): # Only include enabled columns
|
||||
columns.append(col_info)
|
||||
else:
|
||||
# Simple case where we just have column names
|
||||
columns.append({"name": col_name, "enabled": True})
|
||||
elif isinstance(columns_data, list):
|
||||
columns = [col for col in columns_data if col.get("enabled", True)]
|
||||
|
||||
logger.info(
|
||||
f"Retrieved {len(columns)} columns for {schema_name}.{table_name} via direct table API"
|
||||
)
|
||||
return columns
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to get columns for {schema_name}.{table_name}: {e}")
|
||||
return []
|
||||
|
||||
def _process_column_data(self, columns: Any) -> List[Dict]:
|
||||
"""
|
||||
@ -1225,15 +1348,20 @@ class FivetranAPIClient:
|
||||
Uses a generic approach that works for any connector type and properly handles name_in_destination.
|
||||
"""
|
||||
try:
|
||||
# Get the connector details first
|
||||
connector_details = self.get_connector(connector_id)
|
||||
# Get the connector schemas first
|
||||
schemas = self.list_connector_schemas(connector_id)
|
||||
|
||||
# Get destination information
|
||||
destination_id = connector_details.get("group", {}).get("id", "")
|
||||
# Log more details about what we retrieved
|
||||
table_count = sum(len(schema.get("tables", [])) for schema in schemas)
|
||||
logger.info(
|
||||
f"Got {len(schemas)} schemas with {table_count} tables for connector {connector_id}"
|
||||
)
|
||||
|
||||
# Get destination information for naming
|
||||
connector = self.get_connector(connector_id)
|
||||
destination_id = connector.get("group", {}).get("id", "")
|
||||
destination_platform = self.detect_destination_platform(destination_id)
|
||||
|
||||
# Get schema information
|
||||
schemas = self.list_connector_schemas(connector_id)
|
||||
lineage_list = []
|
||||
|
||||
# Handle cases where schemas might be a string or invalid format
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user