mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-10 00:11:15 +00:00
feat(ingest): Improve lookml sql derived tables detection, add cascading derived tables to lineage (#2770)
This commit is contained in:
parent
6fee59ebac
commit
2aa95ec750
@ -82,7 +82,7 @@ plugins: Dict[str, Set[str]] = {
|
|||||||
},
|
},
|
||||||
"ldap": {"python-ldap>=2.4"},
|
"ldap": {"python-ldap>=2.4"},
|
||||||
"looker": {"looker-sdk==21.6.0"},
|
"looker": {"looker-sdk==21.6.0"},
|
||||||
"lookml": {"lkml>=1.1.0", "sql-metadata==1.12.0"},
|
"lookml": {"lkml>=1.1.0", "sql-metadata==2.2.1"},
|
||||||
"mongodb": {"pymongo>=3.11"},
|
"mongodb": {"pymongo>=3.11"},
|
||||||
"mssql": sql_common | {"sqlalchemy-pytds>=0.3"},
|
"mssql": sql_common | {"sqlalchemy-pytds>=0.3"},
|
||||||
"mssql-odbc": sql_common | {"pyodbc"},
|
"mssql-odbc": sql_common | {"pyodbc"},
|
||||||
|
|||||||
@ -13,7 +13,7 @@ if sys.version_info >= (3, 7):
|
|||||||
import lkml
|
import lkml
|
||||||
else:
|
else:
|
||||||
raise ModuleNotFoundError("The lookml plugin requires Python 3.7 or newer.")
|
raise ModuleNotFoundError("The lookml plugin requires Python 3.7 or newer.")
|
||||||
from sql_metadata import get_query_tables
|
from sql_metadata import Parser as SQLParser
|
||||||
|
|
||||||
from datahub.configuration import ConfigModel
|
from datahub.configuration import ConfigModel
|
||||||
from datahub.configuration.common import AllowDenyPattern
|
from datahub.configuration.common import AllowDenyPattern
|
||||||
@ -197,20 +197,9 @@ class LookerView: # pragma: no cover
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _get_sql_table_names(cls, sql: str) -> List[str]:
|
def _get_sql_table_names(cls, sql: str) -> List[str]:
|
||||||
sql_tables: List[str] = get_query_tables(sql)
|
sql_table_names: List[str] = SQLParser(sql).tables
|
||||||
|
|
||||||
# Remove temporary tables from WITH statements
|
# Remove quotes from table names
|
||||||
sql_table_names = [
|
|
||||||
t
|
|
||||||
for t in sql_tables
|
|
||||||
if not re.search(
|
|
||||||
fr"WITH(.*,)?\s+{t}(\s*\([\w\s,]+\))?\s+AS\s+\(",
|
|
||||||
sql,
|
|
||||||
re.IGNORECASE | re.DOTALL,
|
|
||||||
)
|
|
||||||
]
|
|
||||||
|
|
||||||
# Remove quotes from tables
|
|
||||||
sql_table_names = [t.replace('"', "") for t in sql_table_names]
|
sql_table_names = [t.replace('"', "") for t in sql_table_names]
|
||||||
|
|
||||||
return sql_table_names
|
return sql_table_names
|
||||||
@ -383,7 +372,12 @@ class LookMLSource(Source): # pragma: no cover
|
|||||||
def _construct_datalineage_urn(self, sql_table_name: str, connection: str) -> str:
|
def _construct_datalineage_urn(self, sql_table_name: str, connection: str) -> str:
|
||||||
platform = self._get_platform_based_on_connection(connection)
|
platform = self._get_platform_based_on_connection(connection)
|
||||||
|
|
||||||
if "." in platform:
|
# Check if table name matches cascading derived tables pattern (same platform)
|
||||||
|
if re.fullmatch(r"\w+\.SQL_TABLE_NAME", sql_table_name):
|
||||||
|
platform_name = self.source_config.platform_name
|
||||||
|
sql_table_name = sql_table_name.lower().split(".")[0]
|
||||||
|
# Check if table database is in platform name (upstream platform)
|
||||||
|
elif "." in platform:
|
||||||
platform_name, database_name = platform.lower().split(".", maxsplit=1)
|
platform_name, database_name = platform.lower().split(".", maxsplit=1)
|
||||||
sql_table_name = f"{database_name}.{sql_table_name}".lower()
|
sql_table_name = f"{database_name}.{sql_table_name}".lower()
|
||||||
else:
|
else:
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user