diff --git a/.github/workflows/py-checkstyle.yml b/.github/workflows/py-checkstyle.yml index 036c9857af5..0789928b8df 100644 --- a/.github/workflows/py-checkstyle.yml +++ b/.github/workflows/py-checkstyle.yml @@ -88,7 +88,7 @@ jobs: body: | **The Python checkstyle failed.** - Please run `make py_format` in the root of your repository and commit the changes to this PR. + Please run `make py_format` and `py_format_check` in the root of your repository and commit the changes to this PR. You can also use [pre-commit](https://pre-commit.com/) to automate the Python code formatting. You can install the pre-commit hooks with `make install_test precommit_install`. diff --git a/ingestion/setup.py b/ingestion/setup.py index 257d3fc7aa5..06dc8bcc449 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -94,7 +94,7 @@ base_requirements = { "requests-aws4auth~=1.1", # Only depends on requests as external package. Leaving as base. "setuptools~=65.6.3", "sqlalchemy>=1.4.0", - "sqllineage==1.3.7", + "openmetadata-sqllineage==1.0.0", "typing-compat~=0.1.0", # compatibility requirements for 3.7 "typing-inspect", "wheel~=0.38.4", diff --git a/ingestion/src/metadata/ingestion/api/workflow.py b/ingestion/src/metadata/ingestion/api/workflow.py index 21d3a207fa9..59b2f9c8098 100644 --- a/ingestion/src/metadata/ingestion/api/workflow.py +++ b/ingestion/src/metadata/ingestion/api/workflow.py @@ -132,7 +132,11 @@ class Workflow(WorkflowStatusMixin): processor_class = import_processor_class(processor_type=processor_type) processor_config = self.config.processor.dict().get("config", {}) self.processor: Processor = processor_class.create( - processor_config, metadata_config + processor_config, + metadata_config, + connection_type=str( + self.config.source.serviceConnection.__root__.config.type.value + ), ) logger.debug( f"Processor Type: {processor_type}, {processor_class} configured" diff --git a/ingestion/src/metadata/ingestion/lineage/models.py b/ingestion/src/metadata/ingestion/lineage/models.py new file mode 100644 index 00000000000..4d3eaeea579 --- /dev/null +++ b/ingestion/src/metadata/ingestion/lineage/models.py @@ -0,0 +1,110 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Models related to lineage parsing +""" +from enum import Enum +from typing import Dict + +from metadata.generated.schema.entity.services.connections.database.athenaConnection import ( + AthenaType, +) +from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import ( + BigqueryType, +) +from metadata.generated.schema.entity.services.connections.database.clickhouseConnection import ( + ClickhouseType, +) +from metadata.generated.schema.entity.services.connections.database.db2Connection import ( + Db2Type, +) +from metadata.generated.schema.entity.services.connections.database.deltaLakeConnection import ( + DeltaLakeType, +) +from metadata.generated.schema.entity.services.connections.database.hiveConnection import ( + HiveType, +) +from metadata.generated.schema.entity.services.connections.database.mysqlConnection import ( + MySQLType, +) +from metadata.generated.schema.entity.services.connections.database.oracleConnection import ( + OracleType, +) +from metadata.generated.schema.entity.services.connections.database.postgresConnection import ( + PostgresType, +) +from metadata.generated.schema.entity.services.connections.database.redshiftConnection import ( + RedshiftType, +) +from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import ( + SnowflakeType, +) +from metadata.generated.schema.entity.services.connections.database.sqliteConnection import ( + SQLiteType, +) + + +class Dialect(Enum): + """ + Supported dialects by sqllineage + """ + + ANSI = "ansi" + ATHENA = "athena" + BIGQUERY = "bigquery" + CLICKHOUSE = "clickhouse" + DB2 = "db2" + DUCKDB = "duckdb" + EXASOL = "exasol" + HIVE = "hive" + MATERIALIZE = "materialize" + MYSQL = "mysql" + ORACLE = "oracle" + POSTGRES = "postgres" + REDSHIFT = "redshift" + SNOWFLAKE = "snowflake" + SOQL = "soql" + SPARKSQL = "sparksql" + SQLITE = "sqlite" + TERADATA = "teradata" + TSQL = "tsql" + + +MAP_CONNECTION_TYPE_DIALECT: Dict[str, Dialect] = { + str(AthenaType.Athena.value): Dialect.ATHENA, + str(BigqueryType.BigQuery.value): Dialect.BIGQUERY, + str(ClickhouseType.Clickhouse.value): Dialect.CLICKHOUSE, + str(Db2Type.Db2.value): Dialect.DB2, + str(HiveType.Hive.value): Dialect.HIVE, + str(MySQLType.Mysql.value): Dialect.MYSQL, + str(OracleType.Oracle.value): Dialect.ORACLE, + str(PostgresType.Postgres.value): Dialect.POSTGRES, + str(RedshiftType.Redshift.value): Dialect.REDSHIFT, + str(SnowflakeType.Snowflake.value): Dialect.SNOWFLAKE, + str(DeltaLakeType.DeltaLake.value): Dialect.SPARKSQL, + str(SQLiteType.SQLite.value): Dialect.SQLITE, +} + + +class ConnectionTypeDialectMapper: + """ + Auxiliary class to handle the mapping between a connection type and a dialect used to analyze lineage + """ + + @staticmethod + def dialect_of(connection_type: str) -> Dialect: + """ + Returns dialect for a given connection_type + Args: + connection_type: the connection type as string + Returns: a dialect + """ + return MAP_CONNECTION_TYPE_DIALECT.get(connection_type, Dialect.ANSI) diff --git a/ingestion/src/metadata/ingestion/lineage/parser.py b/ingestion/src/metadata/ingestion/lineage/parser.py index 384f06e47d4..cbcace36d2c 100644 --- a/ingestion/src/metadata/ingestion/lineage/parser.py +++ b/ingestion/src/metadata/ingestion/lineage/parser.py @@ -15,12 +15,13 @@ import traceback from collections import defaultdict from copy import deepcopy from logging.config import DictConfigurator -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple, Union from cached_property import cached_property from sqlparse.sql import Comparison, Identifier, Statement from metadata.generated.schema.type.tableUsageCount import TableColumn, TableColumnJoin +from metadata.ingestion.lineage.models import Dialect from metadata.utils.helpers import ( find_in_iter, get_formatted_entity_name, @@ -34,9 +35,10 @@ from metadata.utils.logger import ingestion_logger # pylint: disable=wrong-import-position configure = DictConfigurator.configure DictConfigurator.configure = lambda _: None -from sqllineage.core.models import Column, Table +from sqllineage.core.models import Column, Schema, Table from sqllineage.exceptions import SQLLineageException from sqllineage.runner import LineageRunner +from sqllineage.sqlfluff_core.models import SqlFluffTable # Reverting changes after import is done DictConfigurator.configure = configure @@ -53,10 +55,10 @@ class LineageParser: query: str _clean_query: str - def __init__(self, query: str): + def __init__(self, query: str, dialect: Dialect = Dialect.ANSI): self.query = query self._clean_query = self.clean_raw_query(query) - self.parser = LineageRunner(self._clean_query) + self.parser = self._evaluate_best_parser(self._clean_query, dialect=dialect) @cached_property def involved_tables(self) -> Optional[List[Table]]: @@ -104,11 +106,20 @@ class LineageParser: return self.retrieve_tables(self.parser.target_tables) @cached_property - def column_lineage(self) -> List[Tuple[Column, Column]]: + def column_lineage(self) -> List[Union[Tuple[Column, Column]]]: """ Get a list of tuples of column lineage """ - return self.parser.get_column_lineage() + if self.parser._use_sqlparse: # pylint: disable=protected-access + return self.parser.get_column_lineage() + column_lineage = [] + for src_column, tgt_column in self.parser.get_column_lineage(): + src_col = Column(src_column.raw_name) + src_col._parent = src_column._parent # pylint: disable=protected-access + tgt_col = Column(tgt_column.raw_name) + tgt_col._parent = tgt_column._parent # pylint: disable=protected-access + column_lineage.append((src_col, tgt_col)) + return column_lineage @cached_property def clean_table_list(self) -> List[str]: @@ -297,7 +308,9 @@ class LineageParser: if not self._clean_query: return [] return [ - self.clean_table_name(table) for table in tables if isinstance(table, Table) + self.clean_table_name(table) + for table in tables + if isinstance(table, (Table, SqlFluffTable)) ] @classmethod @@ -334,7 +347,7 @@ class LineageParser: return clean_query.strip() @staticmethod - def clean_table_name(table: Table) -> Table: + def clean_table_name(table: Union[Table, SqlFluffTable]) -> Table: """ Clean table name by: - Removing brackets from the beginning and end of the table and schema name @@ -345,7 +358,14 @@ class LineageParser: Returns: Copy of the table object with cleaned names """ - clean_table = deepcopy(table) + # keep using Table object + if isinstance(table, SqlFluffTable): + clean_table = Table("") + clean_table.raw_name = table.raw_name + clean_table.alias = table.alias + clean_table.schema = Schema(table.schema.raw_name) + else: + clean_table = deepcopy(table) if insensitive_match(clean_table.raw_name, r"\[.*\]"): clean_table.raw_name = insensitive_replace( clean_table.raw_name, r"\[(.*)\]", r"\1" @@ -357,3 +377,49 @@ class LineageParser: clean_table.schema.raw_name, r"\[(.*)\]", r"\1" ) return clean_table + + @staticmethod + def _evaluate_best_parser( + query: str, dialect: Dialect = Dialect.ANSI + ) -> LineageRunner: + sqlfluff_count = 0 + try: + lr_sqlfluff = LineageRunner( + query, dialect=dialect.value, use_sqlparse=False + ) + sqlfluff_count = len(lr_sqlfluff.get_column_lineage()) + len( + set(lr_sqlfluff.source_tables).union( + set(lr_sqlfluff.target_tables).union( + set(lr_sqlfluff.intermediate_tables) + ) + ) + ) + except Exception as exc: + logger.warning( + f"Lineage with SqlFluff failed for the query [{query}]: {exc}" + ) + lr_sqlfluff = None + + sqlparser_count = 0 + lr_sqlparser = LineageRunner(query, dialect=dialect.value) + try: + sqlparser_count = len(lr_sqlparser.get_column_lineage()) + len( + set(lr_sqlparser.source_tables).union( + set(lr_sqlparser.target_tables).union( + set(lr_sqlparser.intermediate_tables) + ) + ) + ) + except Exception: + # if both runner have failed we return the usual one + return lr_sqlfluff if lr_sqlfluff else lr_sqlparser + + if lr_sqlfluff: + # if sqlparser retrieve more lineage info that sqlfluff + if sqlparser_count > sqlfluff_count: + logger.debug( + f"Lineage computed with SqlFluff did not perform as expected for query: [{query}]" + ) + return lr_sqlparser + return lr_sqlfluff + return lr_sqlparser diff --git a/ingestion/src/metadata/ingestion/lineage/sql_lineage.py b/ingestion/src/metadata/ingestion/lineage/sql_lineage.py index 839c85a7cd5..a0b0c765772 100644 --- a/ingestion/src/metadata/ingestion/lineage/sql_lineage.py +++ b/ingestion/src/metadata/ingestion/lineage/sql_lineage.py @@ -22,6 +22,7 @@ from metadata.generated.schema.type.entityLineage import ( LineageDetails, ) from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.lineage.models import Dialect from metadata.ingestion.lineage.parser import LineageParser from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils import fqn @@ -325,6 +326,7 @@ def get_lineage_by_query( database_name: Optional[str], schema_name: Optional[str], query: str, + dialect: Dialect, ) -> Optional[Iterator[AddLineageRequest]]: """ This method parses the query to get source, target and intermediate table names to create lineage, @@ -334,7 +336,7 @@ def get_lineage_by_query( try: logger.debug(f"Running lineage with query: {query}") - lineage_parser = LineageParser(query) + lineage_parser = LineageParser(query, dialect) raw_column_lineage = lineage_parser.column_lineage column_lineage.update(populate_column_lineage_map(raw_column_lineage)) @@ -387,6 +389,7 @@ def get_lineage_via_table_entity( schema_name: str, service_name: str, query: str, + dialect: Dialect, ) -> Optional[Iterator[AddLineageRequest]]: """Get lineage from table entity @@ -397,6 +400,7 @@ def get_lineage_via_table_entity( schema_name (str): name of the schema service_name (str): name of the service query (str): query used for lineage + dialect (str): dialect used for lineage Returns: Optional[Iterator[AddLineageRequest]] @@ -408,7 +412,7 @@ def get_lineage_via_table_entity( try: logger.debug(f"Getting lineage via table entity using query: {query}") - lineage_parser = LineageParser(query) + lineage_parser = LineageParser(query, dialect) to_table_name = table_entity.name.__root__ for from_table_name in lineage_parser.source_tables: diff --git a/ingestion/src/metadata/ingestion/ometa/client.py b/ingestion/src/metadata/ingestion/ometa/client.py index c2f3addfa0a..afd5dfbca4c 100644 --- a/ingestion/src/metadata/ingestion/ometa/client.py +++ b/ingestion/src/metadata/ingestion/ometa/client.py @@ -14,7 +14,7 @@ Python API REST wrapper and helpers import datetime import time import traceback -from typing import Callable, List, Optional, Union +from typing import Callable, Dict, List, Optional, Union import requests from requests.exceptions import HTTPError @@ -165,7 +165,7 @@ class REST: # Example: "Proxy-Authorization": "%(Authorization)s" # This will result in the Authorization value being set for the Proxy-Authorization Extra Header if self.config.extra_headers: - extra_headers: dict[str, str] = self.config.extra_headers + extra_headers: Dict[str, str] = self.config.extra_headers extra_headers = {k: (v % headers) for k, v in extra_headers.items()} logger.debug("Extra headers provided '%s'", extra_headers) headers = {**headers, **extra_headers} diff --git a/ingestion/src/metadata/ingestion/processor/query_parser.py b/ingestion/src/metadata/ingestion/processor/query_parser.py index a2718d89814..e8061c55823 100644 --- a/ingestion/src/metadata/ingestion/processor/query_parser.py +++ b/ingestion/src/metadata/ingestion/processor/query_parser.py @@ -23,18 +23,20 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata from metadata.generated.schema.type.queryParserData import ParsedData, QueryParserData from metadata.generated.schema.type.tableQuery import TableQueries, TableQuery from metadata.ingestion.api.processor import Processor, ProcessorStatus +from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper, Dialect from metadata.ingestion.lineage.parser import LineageParser from metadata.utils.logger import ingestion_logger logger = ingestion_logger() -def parse_sql_statement(record: TableQuery) -> Optional[ParsedData]: +def parse_sql_statement(record: TableQuery, dialect: Dialect) -> Optional[ParsedData]: """ Use the lineage parser and work with the tokens to convert a RAW SQL statement into QueryParserData. :param record: TableQuery from usage + :param dialect: dialect used to compute lineage :return: QueryParserData """ @@ -44,7 +46,7 @@ def parse_sql_statement(record: TableQuery) -> Optional[ParsedData]: str(record.analysisDate), "%Y-%m-%d %H:%M:%S" ).date() - lineage_parser = LineageParser(record.query) + lineage_parser = LineageParser(record.query, dialect=dialect) if not lineage_parser.involved_tables: return None @@ -69,6 +71,7 @@ class QueryParserProcessor(Processor): Args: config (QueryParserProcessorConfig): metadata_config (MetadataServerConfig): + connection_type (str): Attributes: config (QueryParserProcessorConfig): @@ -83,18 +86,21 @@ class QueryParserProcessor(Processor): self, config: ConfigModel, metadata_config: OpenMetadataConnection, + connection_type: str, ): self.config = config self.metadata_config = metadata_config self.status = ProcessorStatus() + self.connection_type = connection_type @classmethod def create( cls, config_dict: dict, metadata_config: OpenMetadataConnection, **kwargs ): config = ConfigModel.parse_obj(config_dict) - return cls(config, metadata_config) + connection_type = kwargs.pop("connection_type", "") + return cls(config, metadata_config, connection_type) def process( # pylint: disable=arguments-differ self, queries: TableQueries @@ -103,7 +109,10 @@ class QueryParserProcessor(Processor): data = [] for record in queries.queries: try: - parsed_sql = parse_sql_statement(record) + parsed_sql = parse_sql_statement( + record, + ConnectionTypeDialectMapper.dialect_of(self.connection_type), + ) if parsed_sql: data.append(parsed_sql) except Exception as exc: diff --git a/ingestion/src/metadata/ingestion/source/database/common_db_source.py b/ingestion/src/metadata/ingestion/source/database/common_db_source.py index 4f61858bcba..b2990cfa5bf 100644 --- a/ingestion/src/metadata/ingestion/source/database/common_db_source.py +++ b/ingestion/src/metadata/ingestion/source/database/common_db_source.py @@ -39,6 +39,7 @@ from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper from metadata.ingestion.lineage.parser import LineageParser from metadata.ingestion.lineage.sql_lineage import ( get_lineage_by_query, @@ -420,7 +421,9 @@ class CommonDbSourceService( ) try: - lineage_parser = LineageParser(view_definition) + connection_type = str(self.service_connection.type.value) + dialect = ConnectionTypeDialectMapper.dialect_of(connection_type) + lineage_parser = LineageParser(view_definition, dialect) if lineage_parser.source_tables and lineage_parser.target_tables: yield from get_lineage_by_query( self.metadata, @@ -428,6 +431,7 @@ class CommonDbSourceService( service_name=self.context.database_service.name.__root__, database_name=db_name, schema_name=schema_name, + dialect=dialect, ) or [] else: @@ -438,6 +442,7 @@ class CommonDbSourceService( database_name=db_name, schema_name=schema_name, query=view_definition, + dialect=dialect, ) or [] except Exception as exc: logger.debug(traceback.format_exc()) diff --git a/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py b/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py index 141f804c51f..863607499b3 100644 --- a/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py @@ -61,6 +61,7 @@ from metadata.generated.schema.type.tagLabel import ( TagSource, ) from metadata.ingestion.api.source import SourceStatus +from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.ometa.ometa_api import OpenMetadata @@ -537,12 +538,17 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods query = ( f"create table {query_fqn} as {data_model_link.datamodel.sql.__root__}" ) + connection_type = str( + self.config.serviceConnection.__root__.config.type.value + ) + dialect = ConnectionTypeDialectMapper.dialect_of(connection_type) lineages = get_lineage_by_query( self.metadata, query=query, service_name=source_elements[0], database_name=source_elements[1], schema_name=source_elements[2], + dialect=dialect, ) for lineage_request in lineages or []: yield lineage_request diff --git a/ingestion/src/metadata/ingestion/source/database/lineage_source.py b/ingestion/src/metadata/ingestion/source/database/lineage_source.py index 71f410a41cb..a072580104f 100644 --- a/ingestion/src/metadata/ingestion/source/database/lineage_source.py +++ b/ingestion/src/metadata/ingestion/source/database/lineage_source.py @@ -18,6 +18,7 @@ from typing import Iterable, Iterator, Optional from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.type.tableQuery import TableQuery +from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query from metadata.ingestion.source.connections import get_connection from metadata.ingestion.source.database.query_parser_source import QueryParserSource @@ -93,6 +94,8 @@ class LineageSource(QueryParserSource, ABC): Based on the query logs, prepare the lineage and send it to the sink """ + connection_type = str(self.service_connection.type.value) + dialect = ConnectionTypeDialectMapper.dialect_of(connection_type) for table_query in self.get_table_query(): lineages = get_lineage_by_query( @@ -101,6 +104,7 @@ class LineageSource(QueryParserSource, ABC): service_name=table_query.serviceName, database_name=table_query.databaseName, schema_name=table_query.databaseSchema, + dialect=dialect, ) for lineage_request in lineages or []: diff --git a/ingestion/src/metadata/ingestion/source/database/postgres/lineage.py b/ingestion/src/metadata/ingestion/source/database/postgres/lineage.py index d297ecec6fe..3c027836fb6 100644 --- a/ingestion/src/metadata/ingestion/source/database/postgres/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/postgres/lineage.py @@ -14,6 +14,7 @@ Postgres lineage module from typing import Iterable from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest +from metadata.ingestion.lineage.models import Dialect from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query from metadata.ingestion.source.database.lineage_source import LineageSource from metadata.ingestion.source.database.postgres.queries import POSTGRES_SQL_STATEMENT @@ -52,6 +53,7 @@ class PostgresLineageSource(PostgresQueryParserSource, LineageSource): service_name=table_query.serviceName, database_name=table_query.databaseName, schema_name=table_query.databaseSchema, + dialect=Dialect.POSTGRES, ) for lineage_request in lineages or []: diff --git a/ingestion/tests/unit/test_sql_lineage.py b/ingestion/tests/unit/test_sql_lineage.py index 9499cea7a87..1e8f4ce2251 100644 --- a/ingestion/tests/unit/test_sql_lineage.py +++ b/ingestion/tests/unit/test_sql_lineage.py @@ -15,8 +15,6 @@ sql lineage utils tests import uuid from unittest import TestCase -from sqllineage.core.models import Column - from metadata.generated.schema.entity.data.table import Table from metadata.ingestion.lineage.parser import LineageParser from metadata.ingestion.lineage.sql_lineage import (