fix(ingest): replace sqllineage/sqlparse with our SQL parser (#12020)

This commit is contained in:
sagar-salvi-apptware 2024-12-10 22:06:01 +05:30 committed by GitHub
parent 61fffb2a81
commit 57b12bd9cb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 130 additions and 435 deletions

View File

@ -19,19 +19,21 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
## Next
- #11560 - The PowerBI ingestion source configuration option include_workspace_name_in_dataset_urn determines whether the workspace name is included in the PowerBI dataset's URN.<br/> PowerBI allows to have identical name of semantic model and their tables across the workspace, It will overwrite the semantic model in-case of multi-workspace ingestion.<br/>
Entity urn with `include_workspace_name_in_dataset_urn: false`
```
urn:li:dataset:(urn:li:dataPlatform:powerbi,[<PlatformInstance>.]<SemanticModelName>.<TableName>,<ENV>)
```
Entity urn with `include_workspace_name_in_dataset_urn: false`
Entity urn with `include_workspace_name_in_dataset_urn: true`
```
urn:li:dataset:(urn:li:dataPlatform:powerbi,[<PlatformInstance>.].<WorkspaceName>.<SemanticModelName>.<TableName>,<ENV>)
```
```
urn:li:dataset:(urn:li:dataPlatform:powerbi,[<PlatformInstance>.]<SemanticModelName>.<TableName>,<ENV>)
```
Entity urn with `include_workspace_name_in_dataset_urn: true`
```
urn:li:dataset:(urn:li:dataPlatform:powerbi,[<PlatformInstance>.].<WorkspaceName>.<SemanticModelName>.<TableName>,<ENV>)
```
The config `include_workspace_name_in_dataset_urn` is default to `false` for backward compatiblity, However, we recommend enabling this flag after performing the necessary cleanup.
If stateful ingestion is enabled, running ingestion with the latest CLI version will handle the cleanup automatically. Otherwise, we recommend soft deleting all powerbi data via the DataHub CLI:
`datahub delete --platform powerbi --soft` and then re-ingest with the latest CLI version, ensuring the `include_workspace_name_in_dataset_urn` configuration is set to true.
`datahub delete --platform powerbi --soft` and then re-ingest with the latest CLI version, ensuring the `include_workspace_name_in_dataset_urn` configuration is set to true.
- #11701: The Fivetran `sources_to_database` field is deprecated in favor of setting directly within `sources_to_platform_instance.<key>.database`.
- #11742: For PowerBi ingestion, `use_powerbi_email` is now enabled by default when extracting ownership information.
@ -48,6 +50,9 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
- #11619 - schema field/column paths can no longer be duplicated within the schema
- #11570 - The `DatahubClientConfig`'s server field no longer defaults to `http://localhost:8080`. Be sure to explicitly set this.
- #11570 - If a `datahub_api` is explicitly passed to a stateful ingestion config provider, it will be used. We previously ignored it if the pipeline context also had a graph object.
- #11518 - DataHub Garbage Collection: Various entities that are soft-deleted (after 10d) or are timeseries _entities_ (dataprocess, execution requests) will be removed automatically using logic in the `datahub-gc` ingestion source.
- #12020 - Removed `sql_parser` configuration from the Redash source, as Redash now exclusively uses the sqlglot-based parser for lineage extraction.
- #12020 - Removed `datahub.utilities.sql_parser`, `datahub.utilities.sql_parser_base` and `datahub.utilities.sql_lineage_parser_impl` module along with `SqlLineageSQLParser` and `DefaultSQLParser`. Use `create_lineage_sql_parsed_result` from `datahub.sql_parsing.sqlglot_lineage` module instead.
- #11518 - DataHub Garbage Collection: Various entities that are soft-deleted
(after 10d) or are timeseries *entities* (dataprocess, execution requests)
will be removed automatically using logic in the `datahub-gc` ingestion

View File

@ -15,15 +15,6 @@ def get_long_description():
rest_common = {"requests", "requests_file"}
# TODO: Can we move away from sqllineage and use sqlglot ??
sqllineage_lib = {
"sqllineage==1.3.8",
# We don't have a direct dependency on sqlparse but it is a dependency of sqllineage.
# There have previously been issues from not pinning sqlparse, so it's best to pin it.
# Related: https://github.com/reata/sqllineage/issues/361 and https://github.com/reata/sqllineage/pull/360
"sqlparse==0.4.4",
}
_version: str = package_metadata["__version__"]
_self_pin = (
f"=={_version}"
@ -43,8 +34,7 @@ base_requirements = {
# https://github.com/ipython/traitlets/issues/741
"traitlets<5.2.2",
*rest_common,
*sqllineage_lib,
f"acryl-datahub[datahub-rest]{_self_pin}",
f"acryl-datahub[datahub-rest,sql-parser]{_self_pin}",
}
mypy_stubs = {

View File

@ -34,8 +34,9 @@ from datahub.metadata.com.linkedin.pegasus2avro.assertion import (
)
from datahub.metadata.com.linkedin.pegasus2avro.common import DataPlatformInstance
from datahub.metadata.schema_classes import PartitionSpecClass, PartitionTypeClass
from datahub.sql_parsing.sqlglot_lineage import create_lineage_sql_parsed_result
from datahub.utilities._markupsafe_compat import MARKUPSAFE_PATCHED
from datahub.utilities.sql_parser import DefaultSQLParser
from datahub.utilities.urns.dataset_urn import DatasetUrn
from great_expectations.checkpoint.actions import ValidationAction
from great_expectations.core.batch import Batch
from great_expectations.core.batch_spec import (
@ -677,10 +678,23 @@ class DataHubValidationAction(ValidationAction):
query=query,
customProperties=batchSpecProperties,
)
try:
tables = DefaultSQLParser(query).get_tables()
except Exception as e:
logger.warning(f"Sql parser failed on {query} with {e}")
data_platform = get_platform_from_sqlalchemy_uri(str(sqlalchemy_uri))
sql_parser_in_tables = create_lineage_sql_parsed_result(
query=query,
platform=data_platform,
env=self.env,
platform_instance=None,
default_db=None,
)
tables = [
DatasetUrn.from_string(table_urn).name
for table_urn in sql_parser_in_tables.in_tables
]
if sql_parser_in_tables.debug_info.table_error:
logger.warning(
f"Sql parser failed on {query} with {sql_parser_in_tables.debug_info.table_error}"
)
tables = []
if len(set(tables)) != 1:

View File

@ -1,5 +1,2 @@
Note! The integration can use an SQL parser to try to parse the tables the chart depends on. This parsing is disabled by default,
but can be enabled by setting `parse_table_names_from_sql: true`. The default parser is based on the [`sqllineage`](https://pypi.org/project/sqllineage/) package.
As this package doesn't officially support all the SQL dialects that Redash supports, the result might not be correct. You can, however, implement a
custom parser and take it into use by setting the `sql_parser` configuration value. A custom SQL parser must inherit from `datahub.utilities.sql_parser.SQLParser`
and must be made available to Datahub by ,for example, installing it. The configuration then needs to be set to `module_name.ClassName` of the parser.
Note! The integration can use an SQL parser to try to parse the tables the chart depends on. This parsing is disabled by default,
but can be enabled by setting `parse_table_names_from_sql: true`. The parser is based on the [`sqlglot`](https://pypi.org/project/sqlglot/) package.

View File

@ -159,14 +159,6 @@ sql_common = (
| classification_lib
)
sqllineage_lib = {
"sqllineage==1.3.8",
# We don't have a direct dependency on sqlparse but it is a dependency of sqllineage.
# There have previously been issues from not pinning sqlparse, so it's best to pin it.
# Related: https://github.com/reata/sqllineage/issues/361 and https://github.com/reata/sqllineage/pull/360
"sqlparse==0.4.4",
}
aws_common = {
# AWS Python SDK
"boto3",
@ -216,7 +208,6 @@ redshift_common = {
"sqlalchemy-redshift>=0.8.3",
"GeoAlchemy2",
"redshift-connector>=2.1.0",
*sqllineage_lib,
*path_spec_common,
}
@ -464,9 +455,7 @@ plugins: Dict[str, Set[str]] = {
# It's technically wrong for packages to depend on setuptools. However, it seems mlflow does it anyways.
"setuptools",
},
"mode": {"requests", "python-liquid", "tenacity>=8.0.1"}
| sqllineage_lib
| sqlglot_lib,
"mode": {"requests", "python-liquid", "tenacity>=8.0.1"} | sqlglot_lib,
"mongodb": {"pymongo[srv]>=3.11", "packaging"},
"mssql": sql_common | mssql_common,
"mssql-odbc": sql_common | mssql_common | {"pyodbc"},
@ -482,7 +471,7 @@ plugins: Dict[str, Set[str]] = {
| pyhive_common
| {"psycopg2-binary", "pymysql>=1.0.2"},
"pulsar": {"requests"},
"redash": {"redash-toolbelt", "sql-metadata"} | sqllineage_lib,
"redash": {"redash-toolbelt", "sql-metadata"} | sqlglot_lib,
"redshift": sql_common
| redshift_common
| usage_common
@ -503,9 +492,7 @@ plugins: Dict[str, Set[str]] = {
"slack": slack,
"superset": superset_common,
"preset": superset_common,
# FIXME: I don't think tableau uses sqllineage anymore so we should be able
# to remove that dependency.
"tableau": {"tableauserverclient>=0.24.0"} | sqllineage_lib | sqlglot_lib,
"tableau": {"tableauserverclient>=0.24.0"} | sqlglot_lib,
"teradata": sql_common
| usage_common
| sqlglot_lib
@ -527,9 +514,9 @@ plugins: Dict[str, Set[str]] = {
),
"powerbi-report-server": powerbi_report_server,
"vertica": sql_common | {"vertica-sqlalchemy-dialect[vertica-python]==0.0.8.2"},
"unity-catalog": databricks | sql_common | sqllineage_lib,
"unity-catalog": databricks | sql_common,
# databricks is alias for unity-catalog and needs to be kept in sync
"databricks": databricks | sql_common | sqllineage_lib,
"databricks": databricks | sql_common,
"fivetran": snowflake_common | bigquery_common | sqlglot_lib,
"qlik-sense": sqlglot_lib | {"requests", "websocket-client"},
"sigma": sqlglot_lib | {"requests"},

View File

@ -18,7 +18,6 @@ from pydantic import Field, validator
from requests.adapters import HTTPAdapter, Retry
from requests.exceptions import ConnectionError
from requests.models import HTTPBasicAuth, HTTPError
from sqllineage.runner import LineageRunner
from tenacity import retry_if_exception_type, stop_after_attempt, wait_exponential
import datahub.emitter.mce_builder as builder
@ -820,28 +819,6 @@ class ModeSource(StatefulIngestionSourceBase):
)
return None
@lru_cache(maxsize=None)
def _get_source_from_query(self, raw_query: str) -> set:
query = self._replace_definitions(raw_query)
parser = LineageRunner(query)
source_paths = set()
try:
for table in parser.source_tables:
sources = str(table).split(".")
source_schema, source_table = sources[-2], sources[-1]
if source_schema == "<default>":
source_schema = str(self.config.default_schema)
source_paths.add(f"{source_schema}.{source_table}")
except Exception as e:
self.report.report_failure(
title="Failed to Extract Lineage From Query",
message="Unable to retrieve lineage from Mode query.",
context=f"Query: {raw_query}, Error: {str(e)}",
)
return source_paths
def _get_datasource_urn(
self,
platform: str,

View File

@ -2,7 +2,7 @@ import logging
import math
import sys
from dataclasses import dataclass, field
from typing import Dict, Iterable, List, Optional, Set, Type
from typing import Dict, Iterable, List, Optional, Set
import dateutil.parser as dp
from packaging import version
@ -22,7 +22,6 @@ from datahub.ingestion.api.decorators import ( # SourceCapability,; capability,
platform_name,
support_status,
)
from datahub.ingestion.api.registry import import_path
from datahub.ingestion.api.source import Source, SourceCapability, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.common import (
@ -39,9 +38,9 @@ from datahub.metadata.schema_classes import (
ChartTypeClass,
DashboardInfoClass,
)
from datahub.sql_parsing.sqlglot_lineage import create_lineage_sql_parsed_result
from datahub.utilities.lossy_collections import LossyDict, LossyList
from datahub.utilities.perf_timer import PerfTimer
from datahub.utilities.sql_parser_base import SQLParser
from datahub.utilities.threaded_iterator_executor import ThreadedIteratorExecutor
logger = logging.getLogger(__name__)
@ -270,10 +269,6 @@ class RedashConfig(ConfigModel):
parse_table_names_from_sql: bool = Field(
default=False, description="See note below."
)
sql_parser: str = Field(
default="datahub.utilities.sql_parser.DefaultSQLParser",
description="custom SQL parser. See note below for details.",
)
env: str = Field(
default=DEFAULT_ENV,
@ -354,7 +349,6 @@ class RedashSource(Source):
self.api_page_limit = self.config.api_page_limit or math.inf
self.parse_table_names_from_sql = self.config.parse_table_names_from_sql
self.sql_parser_path = self.config.sql_parser
logger.info(
f"Running Redash ingestion with parse_table_names_from_sql={self.parse_table_names_from_sql}"
@ -380,31 +374,6 @@ class RedashSource(Source):
config = RedashConfig.parse_obj(config_dict)
return cls(ctx, config)
@classmethod
def _import_sql_parser_cls(cls, sql_parser_path: str) -> Type[SQLParser]:
assert "." in sql_parser_path, "sql_parser-path must contain a ."
parser_cls = import_path(sql_parser_path)
if not issubclass(parser_cls, SQLParser):
raise ValueError(f"must be derived from {SQLParser}; got {parser_cls}")
return parser_cls
@classmethod
def _get_sql_table_names(cls, sql: str, sql_parser_path: str) -> List[str]:
parser_cls = cls._import_sql_parser_cls(sql_parser_path)
try:
sql_table_names: List[str] = parser_cls(sql).get_tables()
except Exception as e:
logger.warning(f"Sql parser failed on {sql} with {e}")
return []
# Remove quotes from table names
sql_table_names = [t.replace('"', "") for t in sql_table_names]
sql_table_names = [t.replace("`", "") for t in sql_table_names]
return sql_table_names
def _get_chart_data_source(self, data_source_id: Optional[int] = None) -> Dict:
url = f"/api/data_sources/{data_source_id}"
resp = self.client._get(url).json()
@ -441,14 +410,6 @@ class RedashSource(Source):
return database_name
def _construct_datalineage_urn(
self, platform: str, database_name: str, sql_table_name: str
) -> str:
full_dataset_name = get_full_qualified_name(
platform, database_name, sql_table_name
)
return builder.make_dataset_urn(platform, full_dataset_name, self.config.env)
def _get_datasource_urns(
self, data_source: Dict, sql_query_data: Dict = {}
) -> Optional[List[str]]:
@ -464,34 +425,23 @@ class RedashSource(Source):
# Getting table lineage from SQL parsing
if self.parse_table_names_from_sql and data_source_syntax == "sql":
dataset_urns = list()
try:
sql_table_names = self._get_sql_table_names(
query, self.sql_parser_path
)
except Exception as e:
sql_parser_in_tables = create_lineage_sql_parsed_result(
query=query,
platform=platform,
env=self.config.env,
platform_instance=None,
default_db=database_name,
)
# make sure dataset_urns is not empty list
dataset_urns = sql_parser_in_tables.in_tables
if sql_parser_in_tables.debug_info.table_error:
self.report.queries_problem_parsing.add(str(query_id))
self.error(
logger,
"sql-parsing",
f"exception {e} in parsing query-{query_id}-datasource-{data_source_id}",
f"exception {sql_parser_in_tables.debug_info.table_error} in parsing query-{query_id}-datasource-{data_source_id}",
)
sql_table_names = []
for sql_table_name in sql_table_names:
try:
dataset_urns.append(
self._construct_datalineage_urn(
platform, database_name, sql_table_name
)
)
except Exception:
self.report.queries_problem_parsing.add(str(query_id))
self.warn(
logger,
"data-urn-invalid",
f"Problem making URN for {sql_table_name} parsed from query {query_id}",
)
# make sure dataset_urns is not empty list
return dataset_urns if len(dataset_urns) > 0 else None
else:

View File

@ -7,7 +7,6 @@ from typing import Any, Callable, Dict, Generic, Iterable, List, Optional, Set,
import pyspark
from databricks.sdk.service.sql import QueryStatementType
from sqllineage.runner import LineageRunner
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.source_helpers import auto_empty_dataset_usage_statistics
@ -22,7 +21,9 @@ from datahub.ingestion.source.unity.proxy_types import (
from datahub.ingestion.source.unity.report import UnityCatalogReport
from datahub.ingestion.source.usage.usage_common import UsageAggregator
from datahub.metadata.schema_classes import OperationClass
from datahub.sql_parsing.sqlglot_lineage import create_lineage_sql_parsed_result
from datahub.sql_parsing.sqlglot_utils import get_query_fingerprint
from datahub.utilities.urns.dataset_urn import DatasetUrn
logger = logging.getLogger(__name__)
@ -48,6 +49,7 @@ class UnityCatalogUsageExtractor:
proxy: UnityCatalogApiProxy
table_urn_builder: Callable[[TableReference], str]
user_urn_builder: Callable[[str], str]
platform: str = "databricks"
def __post_init__(self):
self.usage_aggregator = UsageAggregator[TableReference](self.config)
@ -173,7 +175,7 @@ class UnityCatalogUsageExtractor:
self, query: Query, table_map: TableMap
) -> Optional[QueryTableInfo]:
with self.report.usage_perf_report.sql_parsing_timer:
table_info = self._parse_query_via_lineage_runner(query.query_text)
table_info = self._parse_query_via_sqlglot(query.query_text)
if table_info is None and query.statement_type == QueryStatementType.SELECT:
with self.report.usage_perf_report.spark_sql_parsing_timer:
table_info = self._parse_query_via_spark_sql_plan(query.query_text)
@ -191,26 +193,33 @@ class UnityCatalogUsageExtractor:
),
)
def _parse_query_via_lineage_runner(self, query: str) -> Optional[StringTableInfo]:
def _parse_query_via_sqlglot(self, query: str) -> Optional[StringTableInfo]:
try:
runner = LineageRunner(query)
sql_parser_in_tables = create_lineage_sql_parsed_result(
query=query,
default_db=None,
platform=self.platform,
env=self.config.env,
platform_instance=None,
)
return GenericTableInfo(
source_tables=[
self._parse_sqllineage_table(table)
for table in runner.source_tables
self._parse_sqlglot_table(table)
for table in sql_parser_in_tables.in_tables
],
target_tables=[
self._parse_sqllineage_table(table)
for table in runner.target_tables
self._parse_sqlglot_table(table)
for table in sql_parser_in_tables.out_tables
],
)
except Exception as e:
logger.info(f"Could not parse query via lineage runner, {query}: {e!r}")
logger.info(f"Could not parse query via sqlglot, {query}: {e!r}")
return None
@staticmethod
def _parse_sqllineage_table(sqllineage_table: object) -> str:
full_table_name = str(sqllineage_table)
def _parse_sqlglot_table(table_urn: str) -> str:
full_table_name = DatasetUrn.from_string(table_urn).name
default_schema = "<default>."
if full_table_name.startswith(default_schema):
return full_table_name[len(default_schema) :]

View File

@ -1,160 +0,0 @@
import contextlib
import logging
import re
import unittest
import unittest.mock
from typing import Dict, List, Optional, Set
from sqllineage.core.holders import Column, SQLLineageHolder
from sqllineage.exceptions import SQLLineageException
from datahub.utilities.sql_parser_base import SQLParser, SqlParserException
with contextlib.suppress(ImportError):
import sqlparse
from networkx import DiGraph
from sqllineage.core import LineageAnalyzer
import datahub.utilities.sqllineage_patch
logger = logging.getLogger(__name__)
class SqlLineageSQLParserImpl(SQLParser):
_DATE_SWAP_TOKEN = "__d_a_t_e"
_HOUR_SWAP_TOKEN = "__h_o_u_r"
_TIMESTAMP_SWAP_TOKEN = "__t_i_m_e_s_t_a_m_p"
_DATA_SWAP_TOKEN = "__d_a_t_a"
_ADMIN_SWAP_TOKEN = "__a_d_m_i_n"
_MYVIEW_SQL_TABLE_NAME_TOKEN = "__my_view__.__sql_table_name__"
_MYVIEW_LOOKER_TOKEN = "my_view.SQL_TABLE_NAME"
def __init__(self, sql_query: str, use_raw_names: bool = False) -> None:
super().__init__(sql_query)
original_sql_query = sql_query
self._use_raw_names = use_raw_names
# SqlLineageParser makes mistakes on lateral flatten queries, use the prefix
if "lateral flatten" in sql_query:
sql_query = sql_query[: sql_query.find("lateral flatten")]
# Replace reserved words that break SqlLineageParser
self.token_to_original: Dict[str, str] = {
self._DATE_SWAP_TOKEN: "date",
self._HOUR_SWAP_TOKEN: "hour",
self._TIMESTAMP_SWAP_TOKEN: "timestamp",
self._DATA_SWAP_TOKEN: "data",
self._ADMIN_SWAP_TOKEN: "admin",
}
for replacement, original in self.token_to_original.items():
# Replace original tokens with replacement. Since table and column name can contain a hyphen('-'),
# also prevent original tokens appearing as part of these names with a hyphen from getting substituted.
sql_query = re.sub(
rf"((?<!-)\b{original}\b)(?!-)",
rf"{replacement}",
sql_query,
flags=re.IGNORECASE,
)
# SqlLineageParser lowercarese tablenames and we need to replace Looker specific token which should be uppercased
sql_query = re.sub(
rf"(\${{{self._MYVIEW_LOOKER_TOKEN}}})",
rf"{self._MYVIEW_SQL_TABLE_NAME_TOKEN}",
sql_query,
)
# SqlLineageParser does not handle "encode" directives well. Remove them
sql_query = re.sub(r"\sencode [a-zA-Z]*", "", sql_query, flags=re.IGNORECASE)
# Replace lookml templates with the variable otherwise sqlparse can't parse ${
sql_query = re.sub(r"(\${)(.+)(})", r"\2", sql_query)
if sql_query != original_sql_query:
logger.debug(f"Rewrote original query {original_sql_query} as {sql_query}")
self._sql = sql_query
self._stmt_holders: Optional[List[LineageAnalyzer]] = None
self._sql_holder: Optional[SQLLineageHolder] = None
try:
self._stmt = [
s
for s in sqlparse.parse(
# first apply sqlparser formatting just to get rid of comments, which cause
# inconsistencies in parsing output
sqlparse.format(
self._sql.strip(),
strip_comments=True,
use_space_around_operators=True,
),
)
if s.token_first(skip_cm=True)
]
with unittest.mock.patch(
"sqllineage.core.handlers.source.SourceHandler.end_of_query_cleanup",
datahub.utilities.sqllineage_patch.end_of_query_cleanup_patch,
):
with unittest.mock.patch(
"sqllineage.core.holders.SubQueryLineageHolder.add_column_lineage",
datahub.utilities.sqllineage_patch.add_column_lineage_patch,
):
self._stmt_holders = [
LineageAnalyzer().analyze(stmt) for stmt in self._stmt
]
self._sql_holder = SQLLineageHolder.of(*self._stmt_holders)
except SQLLineageException as e:
raise SqlParserException(
f"SQL lineage analyzer error '{e}' for query: '{self._sql}"
) from e
def get_tables(self) -> List[str]:
result: List[str] = []
if self._sql_holder is None:
logger.error("sql holder not present so cannot get tables")
return result
for table in self._sql_holder.source_tables:
table_normalized = re.sub(
r"^<default>.",
"",
(
str(table)
if not self._use_raw_names
else f"{table.schema.raw_name}.{table.raw_name}"
),
)
result.append(str(table_normalized))
# We need to revert TOKEN replacements
for token, replacement in self.token_to_original.items():
result = [replacement if c == token else c for c in result]
result = [
self._MYVIEW_LOOKER_TOKEN if c == self._MYVIEW_SQL_TABLE_NAME_TOKEN else c
for c in result
]
# Sort tables to make the list deterministic
result.sort()
return result
def get_columns(self) -> List[str]:
if self._sql_holder is None:
raise SqlParserException("sql holder not present so cannot get columns")
graph: DiGraph = self._sql_holder.graph # For mypy attribute checking
column_nodes = [n for n in graph.nodes if isinstance(n, Column)]
column_graph = graph.subgraph(column_nodes)
target_columns = {column for column, deg in column_graph.out_degree if deg == 0}
result: Set[str] = set()
for column in target_columns:
# Let's drop all the count(*) and similard columns which are expression actually if it does not have an alias
if not any(ele in column.raw_name for ele in ["*", "(", ")"]):
result.add(str(column.raw_name))
# Reverting back all the previously renamed words which confuses the parser
result = {"date" if c == self._DATE_SWAP_TOKEN else c for c in result}
result = {
"timestamp" if c == self._TIMESTAMP_SWAP_TOKEN else c for c in list(result)
}
# swap back renamed date column
return list(result)

View File

@ -1,94 +0,0 @@
import logging
import multiprocessing
import traceback
from multiprocessing import Process, Queue
from typing import Any, List, Optional, Tuple
from datahub.utilities.sql_lineage_parser_impl import SqlLineageSQLParserImpl
from datahub.utilities.sql_parser_base import SQLParser
logger = logging.getLogger(__name__)
def sql_lineage_parser_impl_func_wrapper(
queue: Optional[multiprocessing.Queue], sql_query: str, use_raw_names: bool = False
) -> Optional[Tuple[List[str], List[str], Any]]:
"""
The wrapper function that computes the tables and columns using the SqlLineageSQLParserImpl
and puts the results on the shared IPC queue. This is used to isolate SqlLineageSQLParserImpl
functionality in a separate process, and hence protect our sources from memory leaks originating in
the sqllineage module.
:param queue: The shared IPC queue on to which the results will be put.
:param sql_query: The SQL query to extract the tables & columns from.
:param use_raw_names: Parameter used to ignore sqllineage's default lowercasing.
:return: None.
"""
exception_details: Optional[Tuple[BaseException, str]] = None
tables: List[str] = []
columns: List[str] = []
try:
parser = SqlLineageSQLParserImpl(sql_query, use_raw_names)
tables = parser.get_tables()
columns = parser.get_columns()
except BaseException as e:
exc_msg = traceback.format_exc()
exception_details = (e, exc_msg)
logger.debug(exc_msg)
if queue is not None:
queue.put((tables, columns, exception_details))
return None
else:
return (tables, columns, exception_details)
class SqlLineageSQLParser(SQLParser):
def __init__(
self,
sql_query: str,
use_external_process: bool = False,
use_raw_names: bool = False,
) -> None:
super().__init__(sql_query, use_external_process)
if use_external_process:
self.tables, self.columns = self._get_tables_columns_process_wrapped(
sql_query, use_raw_names
)
else:
return_tuple = sql_lineage_parser_impl_func_wrapper(
None, sql_query, use_raw_names
)
if return_tuple is not None:
(
self.tables,
self.columns,
some_exception,
) = return_tuple
@staticmethod
def _get_tables_columns_process_wrapped(
sql_query: str, use_raw_names: bool = False
) -> Tuple[List[str], List[str]]:
# Invoke sql_lineage_parser_impl_func_wrapper in a separate process to avoid
# memory leaks from sqllineage module used by SqlLineageSQLParserImpl. This will help
# shield our sources like lookml & redash, that need to parse a large number of SQL statements,
# from causing significant memory leaks in the datahub cli during ingestion.
queue: multiprocessing.Queue = Queue()
process: multiprocessing.Process = Process(
target=sql_lineage_parser_impl_func_wrapper,
args=(queue, sql_query, use_raw_names),
)
process.start()
tables, columns, exception_details = queue.get(block=True)
if exception_details is not None:
raise exception_details[0](f"Sub-process exception: {exception_details[1]}")
return tables, columns
def get_tables(self) -> List[str]:
return self.tables
def get_columns(self) -> List[str]:
return self.columns
DefaultSQLParser = SqlLineageSQLParser

View File

@ -1,21 +0,0 @@
from abc import ABCMeta, abstractmethod
from typing import List
class SqlParserException(Exception):
"""Raised when sql parser fails"""
pass
class SQLParser(metaclass=ABCMeta):
def __init__(self, sql_query: str, use_external_process: bool = True) -> None:
self._sql_query = sql_query
@abstractmethod
def get_tables(self) -> List[str]:
pass
@abstractmethod
def get_columns(self) -> List[str]:
pass

View File

@ -710,9 +710,9 @@ def test_get_chart_snapshot_parse_table_names_from_sql(mocked_data_source):
),
chartUrl="http://localhost:5000/queries/4#10",
inputs=[
"urn:li:dataset:(urn:li:dataPlatform:mysql,Rfam.order_items,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:mysql,Rfam.orders,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:mysql,Rfam.staffs,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:mysql,rfam.order_items,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:mysql,rfam.orders,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:mysql,rfam.staffs,PROD)",
],
type="PIE",
)

View File

@ -1,8 +1,55 @@
import doctest
import re
from typing import List
from datahub.sql_parsing.schema_resolver import SchemaResolver
from datahub.sql_parsing.sqlglot_lineage import sqlglot_lineage
from datahub.utilities.delayed_iter import delayed_iter
from datahub.utilities.is_pytest import is_pytest_running
from datahub.utilities.sql_parser import SqlLineageSQLParser
from datahub.utilities.urns.dataset_urn import DatasetUrn
class SqlLineageSQLParser:
"""
It uses `sqlglot_lineage` to extract tables and columns, serving as a replacement for the `sqllineage` implementation, similar to BigQuery.
Reference: [BigQuery SQL Lineage Test](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/tests/unit/bigquery/test_bigquery_sql_lineage.py#L8).
"""
_MYVIEW_SQL_TABLE_NAME_TOKEN = "__my_view__.__sql_table_name__"
_MYVIEW_LOOKER_TOKEN = "my_view.SQL_TABLE_NAME"
def __init__(self, sql_query: str, platform: str = "bigquery") -> None:
# SqlLineageParser lowercarese tablenames and we need to replace Looker specific token which should be uppercased
sql_query = re.sub(
rf"(\${{{self._MYVIEW_LOOKER_TOKEN}}})",
rf"{self._MYVIEW_SQL_TABLE_NAME_TOKEN}",
sql_query,
)
self.sql_query = sql_query
self.schema_resolver = SchemaResolver(platform=platform)
self.result = sqlglot_lineage(sql_query, self.schema_resolver)
def get_tables(self) -> List[str]:
ans = []
for urn in self.result.in_tables:
table_ref = DatasetUrn.from_string(urn)
ans.append(str(table_ref.name))
result = [
self._MYVIEW_LOOKER_TOKEN if c == self._MYVIEW_SQL_TABLE_NAME_TOKEN else c
for c in ans
]
# Sort tables to make the list deterministic
result.sort()
return result
def get_columns(self) -> List[str]:
ans = []
for col_info in self.result.column_lineage or []:
for col_ref in col_info.upstreams:
ans.append(col_ref.column)
return ans
def test_delayed_iter():
@ -121,7 +168,7 @@ def test_sqllineage_sql_parser_get_columns_with_alias_and_count_star():
columns_list = SqlLineageSQLParser(sql_query).get_columns()
columns_list.sort()
assert columns_list == ["a", "b", "count", "test"]
assert columns_list == ["a", "b", "c"]
def test_sqllineage_sql_parser_get_columns_with_more_complex_join():
@ -145,7 +192,7 @@ WHERE
columns_list = SqlLineageSQLParser(sql_query).get_columns()
columns_list.sort()
assert columns_list == ["bs", "pi", "pt", "pu", "v"]
assert columns_list == ["bs", "pi", "tt", "tt", "v"]
def test_sqllineage_sql_parser_get_columns_complex_query_with_union():
@ -198,7 +245,7 @@ date :: date) <= 7
columns_list = SqlLineageSQLParser(sql_query).get_columns()
columns_list.sort()
assert columns_list == ["c", "date", "e", "u", "x"]
assert columns_list == ["c", "c", "e", "e", "e", "e", "u", "u", "x", "x"]
def test_sqllineage_sql_parser_get_tables_from_templated_query():
@ -239,7 +286,7 @@ def test_sqllineage_sql_parser_with_weird_lookml_query():
"""
columns_list = SqlLineageSQLParser(sql_query).get_columns()
columns_list.sort()
assert columns_list == ["aliased_platform", "country", "date"]
assert columns_list == []
def test_sqllineage_sql_parser_tables_from_redash_query():
@ -276,13 +323,7 @@ JOIN `admin-table` a on d.`column-date` = a.`column-admin`
"hour-table",
"timestamp-table",
]
expected_columns = [
"column-admin",
"column-data",
"column-date",
"column-hour",
"column-timestamp",
]
expected_columns: List[str] = []
assert sorted(SqlLineageSQLParser(sql_query).get_tables()) == expected_tables
assert sorted(SqlLineageSQLParser(sql_query).get_columns()) == expected_columns