mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-06 22:04:16 +00:00
improved schema matching
This commit is contained in:
parent
b3ea5848fb
commit
6695aeaea4
@ -895,23 +895,56 @@ class FivetranStandardAPI(FivetranAccessInterface):
|
|||||||
return []
|
return []
|
||||||
|
|
||||||
def _find_connector_id_for_source_table(self, source_table: str) -> Optional[str]:
|
def _find_connector_id_for_source_table(self, source_table: str) -> Optional[str]:
|
||||||
"""Find the connector ID for a source table."""
|
"""Find the connector ID for a source table with improved matching."""
|
||||||
|
# Normalize the source table name for more flexible matching
|
||||||
|
normalized_source = source_table.lower()
|
||||||
|
|
||||||
|
# Try to find in the connector cache
|
||||||
for conn in getattr(self, "_connector_cache", []):
|
for conn in getattr(self, "_connector_cache", []):
|
||||||
if not hasattr(conn, "connector_id"):
|
if not hasattr(conn, "connector_id"):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Check in lineage
|
# Check if it's a Salesforce connector if the source table name has "salesforce"
|
||||||
|
if (
|
||||||
|
"salesforce" in normalized_source
|
||||||
|
and conn.connector_type.lower() == "salesforce"
|
||||||
|
):
|
||||||
|
logger.info(
|
||||||
|
f"Matching Salesforce table {source_table} to connector {conn.connector_id}"
|
||||||
|
)
|
||||||
|
return conn.connector_id
|
||||||
|
|
||||||
|
# Check in lineage explicitly
|
||||||
for lineage in getattr(conn, "lineage", []):
|
for lineage in getattr(conn, "lineage", []):
|
||||||
if (
|
if (
|
||||||
hasattr(lineage, "source_table")
|
hasattr(lineage, "source_table")
|
||||||
and lineage.source_table == source_table
|
and lineage.source_table.lower() == normalized_source
|
||||||
):
|
):
|
||||||
return conn.connector_id
|
return conn.connector_id
|
||||||
|
|
||||||
# Also check as substring in case formats don't match exactly
|
# Try partial matching - especially helpful for Salesforce objects
|
||||||
if source_table in str(conn.lineage):
|
for lineage in getattr(conn, "lineage", []):
|
||||||
return conn.connector_id
|
if (
|
||||||
|
hasattr(lineage, "source_table")
|
||||||
|
and source_table.split(".")[0] == lineage.source_table.split(".")[0]
|
||||||
|
):
|
||||||
|
# If schema matches, this is probably the right connector
|
||||||
|
logger.info(
|
||||||
|
f"Found schema match for {source_table} in connector {conn.connector_id}"
|
||||||
|
)
|
||||||
|
return conn.connector_id
|
||||||
|
|
||||||
|
# If no match found, look for connector type matching the schema name
|
||||||
|
schema_name = source_table.split(".")[0] if "." in source_table else ""
|
||||||
|
if schema_name:
|
||||||
|
for conn in getattr(self, "_connector_cache", []):
|
||||||
|
if conn.connector_type.lower() == schema_name.lower():
|
||||||
|
logger.info(
|
||||||
|
f"Matched {source_table} to connector {conn.connector_id} based on schema/type match"
|
||||||
|
)
|
||||||
|
return conn.connector_id
|
||||||
|
|
||||||
|
# No match found
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def _fill_connectors_lineage(self, connectors: List[Connector]) -> None:
|
def _fill_connectors_lineage(self, connectors: List[Connector]) -> None:
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user