diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index bcc89332cc..d8fe06abad 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -19,19 +19,21 @@ This file documents any backwards-incompatible changes in DataHub and assists pe ## Next - #11560 - The PowerBI ingestion source configuration option include_workspace_name_in_dataset_urn determines whether the workspace name is included in the PowerBI dataset's URN.
PowerBI allows to have identical name of semantic model and their tables across the workspace, It will overwrite the semantic model in-case of multi-workspace ingestion.
- Entity urn with `include_workspace_name_in_dataset_urn: false` - ``` - urn:li:dataset:(urn:li:dataPlatform:powerbi,[.].,) - ``` + Entity urn with `include_workspace_name_in_dataset_urn: false` - Entity urn with `include_workspace_name_in_dataset_urn: true` - ``` - urn:li:dataset:(urn:li:dataPlatform:powerbi,[.]...,) - ``` + ``` + urn:li:dataset:(urn:li:dataPlatform:powerbi,[.].,) + ``` + + Entity urn with `include_workspace_name_in_dataset_urn: true` + + ``` + urn:li:dataset:(urn:li:dataPlatform:powerbi,[.]...,) + ``` The config `include_workspace_name_in_dataset_urn` is default to `false` for backward compatiblity, However, we recommend enabling this flag after performing the necessary cleanup. If stateful ingestion is enabled, running ingestion with the latest CLI version will handle the cleanup automatically. Otherwise, we recommend soft deleting all powerbi data via the DataHub CLI: - `datahub delete --platform powerbi --soft` and then re-ingest with the latest CLI version, ensuring the `include_workspace_name_in_dataset_urn` configuration is set to true. + `datahub delete --platform powerbi --soft` and then re-ingest with the latest CLI version, ensuring the `include_workspace_name_in_dataset_urn` configuration is set to true. - #11701: The Fivetran `sources_to_database` field is deprecated in favor of setting directly within `sources_to_platform_instance..database`. - #11742: For PowerBi ingestion, `use_powerbi_email` is now enabled by default when extracting ownership information. @@ -48,6 +50,9 @@ This file documents any backwards-incompatible changes in DataHub and assists pe - #11619 - schema field/column paths can no longer be duplicated within the schema - #11570 - The `DatahubClientConfig`'s server field no longer defaults to `http://localhost:8080`. Be sure to explicitly set this. - #11570 - If a `datahub_api` is explicitly passed to a stateful ingestion config provider, it will be used. We previously ignored it if the pipeline context also had a graph object. +- #11518 - DataHub Garbage Collection: Various entities that are soft-deleted (after 10d) or are timeseries _entities_ (dataprocess, execution requests) will be removed automatically using logic in the `datahub-gc` ingestion source. +- #12020 - Removed `sql_parser` configuration from the Redash source, as Redash now exclusively uses the sqlglot-based parser for lineage extraction. +- #12020 - Removed `datahub.utilities.sql_parser`, `datahub.utilities.sql_parser_base` and `datahub.utilities.sql_lineage_parser_impl` module along with `SqlLineageSQLParser` and `DefaultSQLParser`. Use `create_lineage_sql_parsed_result` from `datahub.sql_parsing.sqlglot_lineage` module instead. - #11518 - DataHub Garbage Collection: Various entities that are soft-deleted (after 10d) or are timeseries *entities* (dataprocess, execution requests) will be removed automatically using logic in the `datahub-gc` ingestion diff --git a/metadata-ingestion-modules/gx-plugin/setup.py b/metadata-ingestion-modules/gx-plugin/setup.py index e87bbded96..73d5d1a9a0 100644 --- a/metadata-ingestion-modules/gx-plugin/setup.py +++ b/metadata-ingestion-modules/gx-plugin/setup.py @@ -15,15 +15,6 @@ def get_long_description(): rest_common = {"requests", "requests_file"} -# TODO: Can we move away from sqllineage and use sqlglot ?? -sqllineage_lib = { - "sqllineage==1.3.8", - # We don't have a direct dependency on sqlparse but it is a dependency of sqllineage. - # There have previously been issues from not pinning sqlparse, so it's best to pin it. - # Related: https://github.com/reata/sqllineage/issues/361 and https://github.com/reata/sqllineage/pull/360 - "sqlparse==0.4.4", -} - _version: str = package_metadata["__version__"] _self_pin = ( f"=={_version}" @@ -43,8 +34,7 @@ base_requirements = { # https://github.com/ipython/traitlets/issues/741 "traitlets<5.2.2", *rest_common, - *sqllineage_lib, - f"acryl-datahub[datahub-rest]{_self_pin}", + f"acryl-datahub[datahub-rest,sql-parser]{_self_pin}", } mypy_stubs = { diff --git a/metadata-ingestion-modules/gx-plugin/src/datahub_gx_plugin/action.py b/metadata-ingestion-modules/gx-plugin/src/datahub_gx_plugin/action.py index 2ad301a38d..2d89d26997 100644 --- a/metadata-ingestion-modules/gx-plugin/src/datahub_gx_plugin/action.py +++ b/metadata-ingestion-modules/gx-plugin/src/datahub_gx_plugin/action.py @@ -34,8 +34,9 @@ from datahub.metadata.com.linkedin.pegasus2avro.assertion import ( ) from datahub.metadata.com.linkedin.pegasus2avro.common import DataPlatformInstance from datahub.metadata.schema_classes import PartitionSpecClass, PartitionTypeClass +from datahub.sql_parsing.sqlglot_lineage import create_lineage_sql_parsed_result from datahub.utilities._markupsafe_compat import MARKUPSAFE_PATCHED -from datahub.utilities.sql_parser import DefaultSQLParser +from datahub.utilities.urns.dataset_urn import DatasetUrn from great_expectations.checkpoint.actions import ValidationAction from great_expectations.core.batch import Batch from great_expectations.core.batch_spec import ( @@ -677,10 +678,23 @@ class DataHubValidationAction(ValidationAction): query=query, customProperties=batchSpecProperties, ) - try: - tables = DefaultSQLParser(query).get_tables() - except Exception as e: - logger.warning(f"Sql parser failed on {query} with {e}") + + data_platform = get_platform_from_sqlalchemy_uri(str(sqlalchemy_uri)) + sql_parser_in_tables = create_lineage_sql_parsed_result( + query=query, + platform=data_platform, + env=self.env, + platform_instance=None, + default_db=None, + ) + tables = [ + DatasetUrn.from_string(table_urn).name + for table_urn in sql_parser_in_tables.in_tables + ] + if sql_parser_in_tables.debug_info.table_error: + logger.warning( + f"Sql parser failed on {query} with {sql_parser_in_tables.debug_info.table_error}" + ) tables = [] if len(set(tables)) != 1: diff --git a/metadata-ingestion/docs/sources/redash/redash.md b/metadata-ingestion/docs/sources/redash/redash.md index 8f8c5c8549..f23a523ceb 100644 --- a/metadata-ingestion/docs/sources/redash/redash.md +++ b/metadata-ingestion/docs/sources/redash/redash.md @@ -1,5 +1,2 @@ -Note! The integration can use an SQL parser to try to parse the tables the chart depends on. This parsing is disabled by default, -but can be enabled by setting `parse_table_names_from_sql: true`. The default parser is based on the [`sqllineage`](https://pypi.org/project/sqllineage/) package. -As this package doesn't officially support all the SQL dialects that Redash supports, the result might not be correct. You can, however, implement a -custom parser and take it into use by setting the `sql_parser` configuration value. A custom SQL parser must inherit from `datahub.utilities.sql_parser.SQLParser` -and must be made available to Datahub by ,for example, installing it. The configuration then needs to be set to `module_name.ClassName` of the parser. +Note! The integration can use an SQL parser to try to parse the tables the chart depends on. This parsing is disabled by default, +but can be enabled by setting `parse_table_names_from_sql: true`. The parser is based on the [`sqlglot`](https://pypi.org/project/sqlglot/) package. diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 5ae5438e21..415871d301 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -159,14 +159,6 @@ sql_common = ( | classification_lib ) -sqllineage_lib = { - "sqllineage==1.3.8", - # We don't have a direct dependency on sqlparse but it is a dependency of sqllineage. - # There have previously been issues from not pinning sqlparse, so it's best to pin it. - # Related: https://github.com/reata/sqllineage/issues/361 and https://github.com/reata/sqllineage/pull/360 - "sqlparse==0.4.4", -} - aws_common = { # AWS Python SDK "boto3", @@ -216,7 +208,6 @@ redshift_common = { "sqlalchemy-redshift>=0.8.3", "GeoAlchemy2", "redshift-connector>=2.1.0", - *sqllineage_lib, *path_spec_common, } @@ -464,9 +455,7 @@ plugins: Dict[str, Set[str]] = { # It's technically wrong for packages to depend on setuptools. However, it seems mlflow does it anyways. "setuptools", }, - "mode": {"requests", "python-liquid", "tenacity>=8.0.1"} - | sqllineage_lib - | sqlglot_lib, + "mode": {"requests", "python-liquid", "tenacity>=8.0.1"} | sqlglot_lib, "mongodb": {"pymongo[srv]>=3.11", "packaging"}, "mssql": sql_common | mssql_common, "mssql-odbc": sql_common | mssql_common | {"pyodbc"}, @@ -482,7 +471,7 @@ plugins: Dict[str, Set[str]] = { | pyhive_common | {"psycopg2-binary", "pymysql>=1.0.2"}, "pulsar": {"requests"}, - "redash": {"redash-toolbelt", "sql-metadata"} | sqllineage_lib, + "redash": {"redash-toolbelt", "sql-metadata"} | sqlglot_lib, "redshift": sql_common | redshift_common | usage_common @@ -503,9 +492,7 @@ plugins: Dict[str, Set[str]] = { "slack": slack, "superset": superset_common, "preset": superset_common, - # FIXME: I don't think tableau uses sqllineage anymore so we should be able - # to remove that dependency. - "tableau": {"tableauserverclient>=0.24.0"} | sqllineage_lib | sqlglot_lib, + "tableau": {"tableauserverclient>=0.24.0"} | sqlglot_lib, "teradata": sql_common | usage_common | sqlglot_lib @@ -527,9 +514,9 @@ plugins: Dict[str, Set[str]] = { ), "powerbi-report-server": powerbi_report_server, "vertica": sql_common | {"vertica-sqlalchemy-dialect[vertica-python]==0.0.8.2"}, - "unity-catalog": databricks | sql_common | sqllineage_lib, + "unity-catalog": databricks | sql_common, # databricks is alias for unity-catalog and needs to be kept in sync - "databricks": databricks | sql_common | sqllineage_lib, + "databricks": databricks | sql_common, "fivetran": snowflake_common | bigquery_common | sqlglot_lib, "qlik-sense": sqlglot_lib | {"requests", "websocket-client"}, "sigma": sqlglot_lib | {"requests"}, diff --git a/metadata-ingestion/src/datahub/ingestion/source/mode.py b/metadata-ingestion/src/datahub/ingestion/source/mode.py index e24cba9b19..c1ab9271ce 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/mode.py +++ b/metadata-ingestion/src/datahub/ingestion/source/mode.py @@ -18,7 +18,6 @@ from pydantic import Field, validator from requests.adapters import HTTPAdapter, Retry from requests.exceptions import ConnectionError from requests.models import HTTPBasicAuth, HTTPError -from sqllineage.runner import LineageRunner from tenacity import retry_if_exception_type, stop_after_attempt, wait_exponential import datahub.emitter.mce_builder as builder @@ -820,28 +819,6 @@ class ModeSource(StatefulIngestionSourceBase): ) return None - @lru_cache(maxsize=None) - def _get_source_from_query(self, raw_query: str) -> set: - query = self._replace_definitions(raw_query) - parser = LineageRunner(query) - source_paths = set() - try: - for table in parser.source_tables: - sources = str(table).split(".") - source_schema, source_table = sources[-2], sources[-1] - if source_schema == "": - source_schema = str(self.config.default_schema) - - source_paths.add(f"{source_schema}.{source_table}") - except Exception as e: - self.report.report_failure( - title="Failed to Extract Lineage From Query", - message="Unable to retrieve lineage from Mode query.", - context=f"Query: {raw_query}, Error: {str(e)}", - ) - - return source_paths - def _get_datasource_urn( self, platform: str, diff --git a/metadata-ingestion/src/datahub/ingestion/source/redash.py b/metadata-ingestion/src/datahub/ingestion/source/redash.py index 581e32d29d..f11d194402 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redash.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redash.py @@ -2,7 +2,7 @@ import logging import math import sys from dataclasses import dataclass, field -from typing import Dict, Iterable, List, Optional, Set, Type +from typing import Dict, Iterable, List, Optional, Set import dateutil.parser as dp from packaging import version @@ -22,7 +22,6 @@ from datahub.ingestion.api.decorators import ( # SourceCapability,; capability, platform_name, support_status, ) -from datahub.ingestion.api.registry import import_path from datahub.ingestion.api.source import Source, SourceCapability, SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.metadata.com.linkedin.pegasus2avro.common import ( @@ -39,9 +38,9 @@ from datahub.metadata.schema_classes import ( ChartTypeClass, DashboardInfoClass, ) +from datahub.sql_parsing.sqlglot_lineage import create_lineage_sql_parsed_result from datahub.utilities.lossy_collections import LossyDict, LossyList from datahub.utilities.perf_timer import PerfTimer -from datahub.utilities.sql_parser_base import SQLParser from datahub.utilities.threaded_iterator_executor import ThreadedIteratorExecutor logger = logging.getLogger(__name__) @@ -270,10 +269,6 @@ class RedashConfig(ConfigModel): parse_table_names_from_sql: bool = Field( default=False, description="See note below." ) - sql_parser: str = Field( - default="datahub.utilities.sql_parser.DefaultSQLParser", - description="custom SQL parser. See note below for details.", - ) env: str = Field( default=DEFAULT_ENV, @@ -354,7 +349,6 @@ class RedashSource(Source): self.api_page_limit = self.config.api_page_limit or math.inf self.parse_table_names_from_sql = self.config.parse_table_names_from_sql - self.sql_parser_path = self.config.sql_parser logger.info( f"Running Redash ingestion with parse_table_names_from_sql={self.parse_table_names_from_sql}" @@ -380,31 +374,6 @@ class RedashSource(Source): config = RedashConfig.parse_obj(config_dict) return cls(ctx, config) - @classmethod - def _import_sql_parser_cls(cls, sql_parser_path: str) -> Type[SQLParser]: - assert "." in sql_parser_path, "sql_parser-path must contain a ." - parser_cls = import_path(sql_parser_path) - - if not issubclass(parser_cls, SQLParser): - raise ValueError(f"must be derived from {SQLParser}; got {parser_cls}") - return parser_cls - - @classmethod - def _get_sql_table_names(cls, sql: str, sql_parser_path: str) -> List[str]: - parser_cls = cls._import_sql_parser_cls(sql_parser_path) - - try: - sql_table_names: List[str] = parser_cls(sql).get_tables() - except Exception as e: - logger.warning(f"Sql parser failed on {sql} with {e}") - return [] - - # Remove quotes from table names - sql_table_names = [t.replace('"', "") for t in sql_table_names] - sql_table_names = [t.replace("`", "") for t in sql_table_names] - - return sql_table_names - def _get_chart_data_source(self, data_source_id: Optional[int] = None) -> Dict: url = f"/api/data_sources/{data_source_id}" resp = self.client._get(url).json() @@ -441,14 +410,6 @@ class RedashSource(Source): return database_name - def _construct_datalineage_urn( - self, platform: str, database_name: str, sql_table_name: str - ) -> str: - full_dataset_name = get_full_qualified_name( - platform, database_name, sql_table_name - ) - return builder.make_dataset_urn(platform, full_dataset_name, self.config.env) - def _get_datasource_urns( self, data_source: Dict, sql_query_data: Dict = {} ) -> Optional[List[str]]: @@ -464,34 +425,23 @@ class RedashSource(Source): # Getting table lineage from SQL parsing if self.parse_table_names_from_sql and data_source_syntax == "sql": dataset_urns = list() - try: - sql_table_names = self._get_sql_table_names( - query, self.sql_parser_path - ) - except Exception as e: + sql_parser_in_tables = create_lineage_sql_parsed_result( + query=query, + platform=platform, + env=self.config.env, + platform_instance=None, + default_db=database_name, + ) + # make sure dataset_urns is not empty list + dataset_urns = sql_parser_in_tables.in_tables + if sql_parser_in_tables.debug_info.table_error: self.report.queries_problem_parsing.add(str(query_id)) self.error( logger, "sql-parsing", - f"exception {e} in parsing query-{query_id}-datasource-{data_source_id}", + f"exception {sql_parser_in_tables.debug_info.table_error} in parsing query-{query_id}-datasource-{data_source_id}", ) - sql_table_names = [] - for sql_table_name in sql_table_names: - try: - dataset_urns.append( - self._construct_datalineage_urn( - platform, database_name, sql_table_name - ) - ) - except Exception: - self.report.queries_problem_parsing.add(str(query_id)) - self.warn( - logger, - "data-urn-invalid", - f"Problem making URN for {sql_table_name} parsed from query {query_id}", - ) - # make sure dataset_urns is not empty list return dataset_urns if len(dataset_urns) > 0 else None else: diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py b/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py index 8c42ac81b9..718818d9b3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py @@ -7,7 +7,6 @@ from typing import Any, Callable, Dict, Generic, Iterable, List, Optional, Set, import pyspark from databricks.sdk.service.sql import QueryStatementType -from sqllineage.runner import LineageRunner from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.source_helpers import auto_empty_dataset_usage_statistics @@ -22,7 +21,9 @@ from datahub.ingestion.source.unity.proxy_types import ( from datahub.ingestion.source.unity.report import UnityCatalogReport from datahub.ingestion.source.usage.usage_common import UsageAggregator from datahub.metadata.schema_classes import OperationClass +from datahub.sql_parsing.sqlglot_lineage import create_lineage_sql_parsed_result from datahub.sql_parsing.sqlglot_utils import get_query_fingerprint +from datahub.utilities.urns.dataset_urn import DatasetUrn logger = logging.getLogger(__name__) @@ -48,6 +49,7 @@ class UnityCatalogUsageExtractor: proxy: UnityCatalogApiProxy table_urn_builder: Callable[[TableReference], str] user_urn_builder: Callable[[str], str] + platform: str = "databricks" def __post_init__(self): self.usage_aggregator = UsageAggregator[TableReference](self.config) @@ -173,7 +175,7 @@ class UnityCatalogUsageExtractor: self, query: Query, table_map: TableMap ) -> Optional[QueryTableInfo]: with self.report.usage_perf_report.sql_parsing_timer: - table_info = self._parse_query_via_lineage_runner(query.query_text) + table_info = self._parse_query_via_sqlglot(query.query_text) if table_info is None and query.statement_type == QueryStatementType.SELECT: with self.report.usage_perf_report.spark_sql_parsing_timer: table_info = self._parse_query_via_spark_sql_plan(query.query_text) @@ -191,26 +193,33 @@ class UnityCatalogUsageExtractor: ), ) - def _parse_query_via_lineage_runner(self, query: str) -> Optional[StringTableInfo]: + def _parse_query_via_sqlglot(self, query: str) -> Optional[StringTableInfo]: try: - runner = LineageRunner(query) + sql_parser_in_tables = create_lineage_sql_parsed_result( + query=query, + default_db=None, + platform=self.platform, + env=self.config.env, + platform_instance=None, + ) + return GenericTableInfo( source_tables=[ - self._parse_sqllineage_table(table) - for table in runner.source_tables + self._parse_sqlglot_table(table) + for table in sql_parser_in_tables.in_tables ], target_tables=[ - self._parse_sqllineage_table(table) - for table in runner.target_tables + self._parse_sqlglot_table(table) + for table in sql_parser_in_tables.out_tables ], ) except Exception as e: - logger.info(f"Could not parse query via lineage runner, {query}: {e!r}") + logger.info(f"Could not parse query via sqlglot, {query}: {e!r}") return None @staticmethod - def _parse_sqllineage_table(sqllineage_table: object) -> str: - full_table_name = str(sqllineage_table) + def _parse_sqlglot_table(table_urn: str) -> str: + full_table_name = DatasetUrn.from_string(table_urn).name default_schema = "." if full_table_name.startswith(default_schema): return full_table_name[len(default_schema) :] diff --git a/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py b/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py deleted file mode 100644 index 5a8802c7a0..0000000000 --- a/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py +++ /dev/null @@ -1,160 +0,0 @@ -import contextlib -import logging -import re -import unittest -import unittest.mock -from typing import Dict, List, Optional, Set - -from sqllineage.core.holders import Column, SQLLineageHolder -from sqllineage.exceptions import SQLLineageException - -from datahub.utilities.sql_parser_base import SQLParser, SqlParserException - -with contextlib.suppress(ImportError): - import sqlparse - from networkx import DiGraph - from sqllineage.core import LineageAnalyzer - - import datahub.utilities.sqllineage_patch -logger = logging.getLogger(__name__) - - -class SqlLineageSQLParserImpl(SQLParser): - _DATE_SWAP_TOKEN = "__d_a_t_e" - _HOUR_SWAP_TOKEN = "__h_o_u_r" - _TIMESTAMP_SWAP_TOKEN = "__t_i_m_e_s_t_a_m_p" - _DATA_SWAP_TOKEN = "__d_a_t_a" - _ADMIN_SWAP_TOKEN = "__a_d_m_i_n" - _MYVIEW_SQL_TABLE_NAME_TOKEN = "__my_view__.__sql_table_name__" - _MYVIEW_LOOKER_TOKEN = "my_view.SQL_TABLE_NAME" - - def __init__(self, sql_query: str, use_raw_names: bool = False) -> None: - super().__init__(sql_query) - original_sql_query = sql_query - self._use_raw_names = use_raw_names - - # SqlLineageParser makes mistakes on lateral flatten queries, use the prefix - if "lateral flatten" in sql_query: - sql_query = sql_query[: sql_query.find("lateral flatten")] - - # Replace reserved words that break SqlLineageParser - self.token_to_original: Dict[str, str] = { - self._DATE_SWAP_TOKEN: "date", - self._HOUR_SWAP_TOKEN: "hour", - self._TIMESTAMP_SWAP_TOKEN: "timestamp", - self._DATA_SWAP_TOKEN: "data", - self._ADMIN_SWAP_TOKEN: "admin", - } - for replacement, original in self.token_to_original.items(): - # Replace original tokens with replacement. Since table and column name can contain a hyphen('-'), - # also prevent original tokens appearing as part of these names with a hyphen from getting substituted. - sql_query = re.sub( - rf"((? List[str]: - result: List[str] = [] - if self._sql_holder is None: - logger.error("sql holder not present so cannot get tables") - return result - for table in self._sql_holder.source_tables: - table_normalized = re.sub( - r"^.", - "", - ( - str(table) - if not self._use_raw_names - else f"{table.schema.raw_name}.{table.raw_name}" - ), - ) - result.append(str(table_normalized)) - - # We need to revert TOKEN replacements - for token, replacement in self.token_to_original.items(): - result = [replacement if c == token else c for c in result] - result = [ - self._MYVIEW_LOOKER_TOKEN if c == self._MYVIEW_SQL_TABLE_NAME_TOKEN else c - for c in result - ] - - # Sort tables to make the list deterministic - result.sort() - - return result - - def get_columns(self) -> List[str]: - if self._sql_holder is None: - raise SqlParserException("sql holder not present so cannot get columns") - graph: DiGraph = self._sql_holder.graph # For mypy attribute checking - column_nodes = [n for n in graph.nodes if isinstance(n, Column)] - column_graph = graph.subgraph(column_nodes) - - target_columns = {column for column, deg in column_graph.out_degree if deg == 0} - - result: Set[str] = set() - for column in target_columns: - # Let's drop all the count(*) and similard columns which are expression actually if it does not have an alias - if not any(ele in column.raw_name for ele in ["*", "(", ")"]): - result.add(str(column.raw_name)) - - # Reverting back all the previously renamed words which confuses the parser - result = {"date" if c == self._DATE_SWAP_TOKEN else c for c in result} - result = { - "timestamp" if c == self._TIMESTAMP_SWAP_TOKEN else c for c in list(result) - } - - # swap back renamed date column - return list(result) diff --git a/metadata-ingestion/src/datahub/utilities/sql_parser.py b/metadata-ingestion/src/datahub/utilities/sql_parser.py deleted file mode 100644 index b88f8fd8c7..0000000000 --- a/metadata-ingestion/src/datahub/utilities/sql_parser.py +++ /dev/null @@ -1,94 +0,0 @@ -import logging -import multiprocessing -import traceback -from multiprocessing import Process, Queue -from typing import Any, List, Optional, Tuple - -from datahub.utilities.sql_lineage_parser_impl import SqlLineageSQLParserImpl -from datahub.utilities.sql_parser_base import SQLParser - -logger = logging.getLogger(__name__) - - -def sql_lineage_parser_impl_func_wrapper( - queue: Optional[multiprocessing.Queue], sql_query: str, use_raw_names: bool = False -) -> Optional[Tuple[List[str], List[str], Any]]: - """ - The wrapper function that computes the tables and columns using the SqlLineageSQLParserImpl - and puts the results on the shared IPC queue. This is used to isolate SqlLineageSQLParserImpl - functionality in a separate process, and hence protect our sources from memory leaks originating in - the sqllineage module. - :param queue: The shared IPC queue on to which the results will be put. - :param sql_query: The SQL query to extract the tables & columns from. - :param use_raw_names: Parameter used to ignore sqllineage's default lowercasing. - :return: None. - """ - exception_details: Optional[Tuple[BaseException, str]] = None - tables: List[str] = [] - columns: List[str] = [] - try: - parser = SqlLineageSQLParserImpl(sql_query, use_raw_names) - tables = parser.get_tables() - columns = parser.get_columns() - except BaseException as e: - exc_msg = traceback.format_exc() - exception_details = (e, exc_msg) - logger.debug(exc_msg) - - if queue is not None: - queue.put((tables, columns, exception_details)) - return None - else: - return (tables, columns, exception_details) - - -class SqlLineageSQLParser(SQLParser): - def __init__( - self, - sql_query: str, - use_external_process: bool = False, - use_raw_names: bool = False, - ) -> None: - super().__init__(sql_query, use_external_process) - if use_external_process: - self.tables, self.columns = self._get_tables_columns_process_wrapped( - sql_query, use_raw_names - ) - else: - return_tuple = sql_lineage_parser_impl_func_wrapper( - None, sql_query, use_raw_names - ) - if return_tuple is not None: - ( - self.tables, - self.columns, - some_exception, - ) = return_tuple - - @staticmethod - def _get_tables_columns_process_wrapped( - sql_query: str, use_raw_names: bool = False - ) -> Tuple[List[str], List[str]]: - # Invoke sql_lineage_parser_impl_func_wrapper in a separate process to avoid - # memory leaks from sqllineage module used by SqlLineageSQLParserImpl. This will help - # shield our sources like lookml & redash, that need to parse a large number of SQL statements, - # from causing significant memory leaks in the datahub cli during ingestion. - queue: multiprocessing.Queue = Queue() - process: multiprocessing.Process = Process( - target=sql_lineage_parser_impl_func_wrapper, - args=(queue, sql_query, use_raw_names), - ) - process.start() - tables, columns, exception_details = queue.get(block=True) - if exception_details is not None: - raise exception_details[0](f"Sub-process exception: {exception_details[1]}") - return tables, columns - - def get_tables(self) -> List[str]: - return self.tables - - def get_columns(self) -> List[str]: - return self.columns - - -DefaultSQLParser = SqlLineageSQLParser diff --git a/metadata-ingestion/src/datahub/utilities/sql_parser_base.py b/metadata-ingestion/src/datahub/utilities/sql_parser_base.py deleted file mode 100644 index 8fd5dfaf49..0000000000 --- a/metadata-ingestion/src/datahub/utilities/sql_parser_base.py +++ /dev/null @@ -1,21 +0,0 @@ -from abc import ABCMeta, abstractmethod -from typing import List - - -class SqlParserException(Exception): - """Raised when sql parser fails""" - - pass - - -class SQLParser(metaclass=ABCMeta): - def __init__(self, sql_query: str, use_external_process: bool = True) -> None: - self._sql_query = sql_query - - @abstractmethod - def get_tables(self) -> List[str]: - pass - - @abstractmethod - def get_columns(self) -> List[str]: - pass diff --git a/metadata-ingestion/tests/unit/test_redash_source.py b/metadata-ingestion/tests/unit/test_redash_source.py index 2982fe76c4..32ab200847 100644 --- a/metadata-ingestion/tests/unit/test_redash_source.py +++ b/metadata-ingestion/tests/unit/test_redash_source.py @@ -710,9 +710,9 @@ def test_get_chart_snapshot_parse_table_names_from_sql(mocked_data_source): ), chartUrl="http://localhost:5000/queries/4#10", inputs=[ - "urn:li:dataset:(urn:li:dataPlatform:mysql,Rfam.order_items,PROD)", - "urn:li:dataset:(urn:li:dataPlatform:mysql,Rfam.orders,PROD)", - "urn:li:dataset:(urn:li:dataPlatform:mysql,Rfam.staffs,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:mysql,rfam.order_items,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:mysql,rfam.orders,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:mysql,rfam.staffs,PROD)", ], type="PIE", ) diff --git a/metadata-ingestion/tests/unit/utilities/test_utilities.py b/metadata-ingestion/tests/unit/utilities/test_utilities.py index 68da1bc1c0..91819bff41 100644 --- a/metadata-ingestion/tests/unit/utilities/test_utilities.py +++ b/metadata-ingestion/tests/unit/utilities/test_utilities.py @@ -1,8 +1,55 @@ import doctest +import re +from typing import List +from datahub.sql_parsing.schema_resolver import SchemaResolver +from datahub.sql_parsing.sqlglot_lineage import sqlglot_lineage from datahub.utilities.delayed_iter import delayed_iter from datahub.utilities.is_pytest import is_pytest_running -from datahub.utilities.sql_parser import SqlLineageSQLParser +from datahub.utilities.urns.dataset_urn import DatasetUrn + + +class SqlLineageSQLParser: + """ + It uses `sqlglot_lineage` to extract tables and columns, serving as a replacement for the `sqllineage` implementation, similar to BigQuery. + Reference: [BigQuery SQL Lineage Test](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/tests/unit/bigquery/test_bigquery_sql_lineage.py#L8). + """ + + _MYVIEW_SQL_TABLE_NAME_TOKEN = "__my_view__.__sql_table_name__" + _MYVIEW_LOOKER_TOKEN = "my_view.SQL_TABLE_NAME" + + def __init__(self, sql_query: str, platform: str = "bigquery") -> None: + # SqlLineageParser lowercarese tablenames and we need to replace Looker specific token which should be uppercased + sql_query = re.sub( + rf"(\${{{self._MYVIEW_LOOKER_TOKEN}}})", + rf"{self._MYVIEW_SQL_TABLE_NAME_TOKEN}", + sql_query, + ) + self.sql_query = sql_query + self.schema_resolver = SchemaResolver(platform=platform) + self.result = sqlglot_lineage(sql_query, self.schema_resolver) + + def get_tables(self) -> List[str]: + ans = [] + for urn in self.result.in_tables: + table_ref = DatasetUrn.from_string(urn) + ans.append(str(table_ref.name)) + + result = [ + self._MYVIEW_LOOKER_TOKEN if c == self._MYVIEW_SQL_TABLE_NAME_TOKEN else c + for c in ans + ] + # Sort tables to make the list deterministic + result.sort() + + return result + + def get_columns(self) -> List[str]: + ans = [] + for col_info in self.result.column_lineage or []: + for col_ref in col_info.upstreams: + ans.append(col_ref.column) + return ans def test_delayed_iter(): @@ -121,7 +168,7 @@ def test_sqllineage_sql_parser_get_columns_with_alias_and_count_star(): columns_list = SqlLineageSQLParser(sql_query).get_columns() columns_list.sort() - assert columns_list == ["a", "b", "count", "test"] + assert columns_list == ["a", "b", "c"] def test_sqllineage_sql_parser_get_columns_with_more_complex_join(): @@ -145,7 +192,7 @@ WHERE columns_list = SqlLineageSQLParser(sql_query).get_columns() columns_list.sort() - assert columns_list == ["bs", "pi", "pt", "pu", "v"] + assert columns_list == ["bs", "pi", "tt", "tt", "v"] def test_sqllineage_sql_parser_get_columns_complex_query_with_union(): @@ -198,7 +245,7 @@ date :: date) <= 7 columns_list = SqlLineageSQLParser(sql_query).get_columns() columns_list.sort() - assert columns_list == ["c", "date", "e", "u", "x"] + assert columns_list == ["c", "c", "e", "e", "e", "e", "u", "u", "x", "x"] def test_sqllineage_sql_parser_get_tables_from_templated_query(): @@ -239,7 +286,7 @@ def test_sqllineage_sql_parser_with_weird_lookml_query(): """ columns_list = SqlLineageSQLParser(sql_query).get_columns() columns_list.sort() - assert columns_list == ["aliased_platform", "country", "date"] + assert columns_list == [] def test_sqllineage_sql_parser_tables_from_redash_query(): @@ -276,13 +323,7 @@ JOIN `admin-table` a on d.`column-date` = a.`column-admin` "hour-table", "timestamp-table", ] - expected_columns = [ - "column-admin", - "column-data", - "column-date", - "column-hour", - "column-timestamp", - ] + expected_columns: List[str] = [] assert sorted(SqlLineageSQLParser(sql_query).get_tables()) == expected_tables assert sorted(SqlLineageSQLParser(sql_query).get_columns()) == expected_columns