Fix: Replace sqllineage with openmetadata-sqllineage (#9800)

* Replace sqllineage with openmetadata-sqllineage

* Fix checkstyle and failing test

* Move logic to retrieve dialect of a service type into a class

* Improve py-check message when it fails

* Updated mapper

* Update code after merge
This commit is contained in:
Nahuel 2023-01-19 14:56:29 +01:00 committed by GitHub
parent 87719cef01
commit ddff6e2875
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 231 additions and 23 deletions

View File

@ -88,7 +88,7 @@ jobs:
body: |
**The Python checkstyle failed.**
Please run `make py_format` in the root of your repository and commit the changes to this PR.
Please run `make py_format` and `py_format_check` in the root of your repository and commit the changes to this PR.
You can also use [pre-commit](https://pre-commit.com/) to automate the Python code formatting.
You can install the pre-commit hooks with `make install_test precommit_install`.

View File

@ -94,7 +94,7 @@ base_requirements = {
"requests-aws4auth~=1.1", # Only depends on requests as external package. Leaving as base.
"setuptools~=65.6.3",
"sqlalchemy>=1.4.0",
"sqllineage==1.3.7",
"openmetadata-sqllineage==1.0.0",
"typing-compat~=0.1.0", # compatibility requirements for 3.7
"typing-inspect",
"wheel~=0.38.4",

View File

@ -132,7 +132,11 @@ class Workflow(WorkflowStatusMixin):
processor_class = import_processor_class(processor_type=processor_type)
processor_config = self.config.processor.dict().get("config", {})
self.processor: Processor = processor_class.create(
processor_config, metadata_config
processor_config,
metadata_config,
connection_type=str(
self.config.source.serviceConnection.__root__.config.type.value
),
)
logger.debug(
f"Processor Type: {processor_type}, {processor_class} configured"

View File

@ -0,0 +1,110 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Models related to lineage parsing
"""
from enum import Enum
from typing import Dict
from metadata.generated.schema.entity.services.connections.database.athenaConnection import (
AthenaType,
)
from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import (
BigqueryType,
)
from metadata.generated.schema.entity.services.connections.database.clickhouseConnection import (
ClickhouseType,
)
from metadata.generated.schema.entity.services.connections.database.db2Connection import (
Db2Type,
)
from metadata.generated.schema.entity.services.connections.database.deltaLakeConnection import (
DeltaLakeType,
)
from metadata.generated.schema.entity.services.connections.database.hiveConnection import (
HiveType,
)
from metadata.generated.schema.entity.services.connections.database.mysqlConnection import (
MySQLType,
)
from metadata.generated.schema.entity.services.connections.database.oracleConnection import (
OracleType,
)
from metadata.generated.schema.entity.services.connections.database.postgresConnection import (
PostgresType,
)
from metadata.generated.schema.entity.services.connections.database.redshiftConnection import (
RedshiftType,
)
from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import (
SnowflakeType,
)
from metadata.generated.schema.entity.services.connections.database.sqliteConnection import (
SQLiteType,
)
class Dialect(Enum):
"""
Supported dialects by sqllineage
"""
ANSI = "ansi"
ATHENA = "athena"
BIGQUERY = "bigquery"
CLICKHOUSE = "clickhouse"
DB2 = "db2"
DUCKDB = "duckdb"
EXASOL = "exasol"
HIVE = "hive"
MATERIALIZE = "materialize"
MYSQL = "mysql"
ORACLE = "oracle"
POSTGRES = "postgres"
REDSHIFT = "redshift"
SNOWFLAKE = "snowflake"
SOQL = "soql"
SPARKSQL = "sparksql"
SQLITE = "sqlite"
TERADATA = "teradata"
TSQL = "tsql"
MAP_CONNECTION_TYPE_DIALECT: Dict[str, Dialect] = {
str(AthenaType.Athena.value): Dialect.ATHENA,
str(BigqueryType.BigQuery.value): Dialect.BIGQUERY,
str(ClickhouseType.Clickhouse.value): Dialect.CLICKHOUSE,
str(Db2Type.Db2.value): Dialect.DB2,
str(HiveType.Hive.value): Dialect.HIVE,
str(MySQLType.Mysql.value): Dialect.MYSQL,
str(OracleType.Oracle.value): Dialect.ORACLE,
str(PostgresType.Postgres.value): Dialect.POSTGRES,
str(RedshiftType.Redshift.value): Dialect.REDSHIFT,
str(SnowflakeType.Snowflake.value): Dialect.SNOWFLAKE,
str(DeltaLakeType.DeltaLake.value): Dialect.SPARKSQL,
str(SQLiteType.SQLite.value): Dialect.SQLITE,
}
class ConnectionTypeDialectMapper:
"""
Auxiliary class to handle the mapping between a connection type and a dialect used to analyze lineage
"""
@staticmethod
def dialect_of(connection_type: str) -> Dialect:
"""
Returns dialect for a given connection_type
Args:
connection_type: the connection type as string
Returns: a dialect
"""
return MAP_CONNECTION_TYPE_DIALECT.get(connection_type, Dialect.ANSI)

View File

@ -15,12 +15,13 @@ import traceback
from collections import defaultdict
from copy import deepcopy
from logging.config import DictConfigurator
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, Dict, List, Optional, Tuple, Union
from cached_property import cached_property
from sqlparse.sql import Comparison, Identifier, Statement
from metadata.generated.schema.type.tableUsageCount import TableColumn, TableColumnJoin
from metadata.ingestion.lineage.models import Dialect
from metadata.utils.helpers import (
find_in_iter,
get_formatted_entity_name,
@ -34,9 +35,10 @@ from metadata.utils.logger import ingestion_logger
# pylint: disable=wrong-import-position
configure = DictConfigurator.configure
DictConfigurator.configure = lambda _: None
from sqllineage.core.models import Column, Table
from sqllineage.core.models import Column, Schema, Table
from sqllineage.exceptions import SQLLineageException
from sqllineage.runner import LineageRunner
from sqllineage.sqlfluff_core.models import SqlFluffTable
# Reverting changes after import is done
DictConfigurator.configure = configure
@ -53,10 +55,10 @@ class LineageParser:
query: str
_clean_query: str
def __init__(self, query: str):
def __init__(self, query: str, dialect: Dialect = Dialect.ANSI):
self.query = query
self._clean_query = self.clean_raw_query(query)
self.parser = LineageRunner(self._clean_query)
self.parser = self._evaluate_best_parser(self._clean_query, dialect=dialect)
@cached_property
def involved_tables(self) -> Optional[List[Table]]:
@ -104,11 +106,20 @@ class LineageParser:
return self.retrieve_tables(self.parser.target_tables)
@cached_property
def column_lineage(self) -> List[Tuple[Column, Column]]:
def column_lineage(self) -> List[Union[Tuple[Column, Column]]]:
"""
Get a list of tuples of column lineage
"""
return self.parser.get_column_lineage()
if self.parser._use_sqlparse: # pylint: disable=protected-access
return self.parser.get_column_lineage()
column_lineage = []
for src_column, tgt_column in self.parser.get_column_lineage():
src_col = Column(src_column.raw_name)
src_col._parent = src_column._parent # pylint: disable=protected-access
tgt_col = Column(tgt_column.raw_name)
tgt_col._parent = tgt_column._parent # pylint: disable=protected-access
column_lineage.append((src_col, tgt_col))
return column_lineage
@cached_property
def clean_table_list(self) -> List[str]:
@ -297,7 +308,9 @@ class LineageParser:
if not self._clean_query:
return []
return [
self.clean_table_name(table) for table in tables if isinstance(table, Table)
self.clean_table_name(table)
for table in tables
if isinstance(table, (Table, SqlFluffTable))
]
@classmethod
@ -334,7 +347,7 @@ class LineageParser:
return clean_query.strip()
@staticmethod
def clean_table_name(table: Table) -> Table:
def clean_table_name(table: Union[Table, SqlFluffTable]) -> Table:
"""
Clean table name by:
- Removing brackets from the beginning and end of the table and schema name
@ -345,7 +358,14 @@ class LineageParser:
Returns:
Copy of the table object with cleaned names
"""
clean_table = deepcopy(table)
# keep using Table object
if isinstance(table, SqlFluffTable):
clean_table = Table("")
clean_table.raw_name = table.raw_name
clean_table.alias = table.alias
clean_table.schema = Schema(table.schema.raw_name)
else:
clean_table = deepcopy(table)
if insensitive_match(clean_table.raw_name, r"\[.*\]"):
clean_table.raw_name = insensitive_replace(
clean_table.raw_name, r"\[(.*)\]", r"\1"
@ -357,3 +377,49 @@ class LineageParser:
clean_table.schema.raw_name, r"\[(.*)\]", r"\1"
)
return clean_table
@staticmethod
def _evaluate_best_parser(
query: str, dialect: Dialect = Dialect.ANSI
) -> LineageRunner:
sqlfluff_count = 0
try:
lr_sqlfluff = LineageRunner(
query, dialect=dialect.value, use_sqlparse=False
)
sqlfluff_count = len(lr_sqlfluff.get_column_lineage()) + len(
set(lr_sqlfluff.source_tables).union(
set(lr_sqlfluff.target_tables).union(
set(lr_sqlfluff.intermediate_tables)
)
)
)
except Exception as exc:
logger.warning(
f"Lineage with SqlFluff failed for the query [{query}]: {exc}"
)
lr_sqlfluff = None
sqlparser_count = 0
lr_sqlparser = LineageRunner(query, dialect=dialect.value)
try:
sqlparser_count = len(lr_sqlparser.get_column_lineage()) + len(
set(lr_sqlparser.source_tables).union(
set(lr_sqlparser.target_tables).union(
set(lr_sqlparser.intermediate_tables)
)
)
)
except Exception:
# if both runner have failed we return the usual one
return lr_sqlfluff if lr_sqlfluff else lr_sqlparser
if lr_sqlfluff:
# if sqlparser retrieve more lineage info that sqlfluff
if sqlparser_count > sqlfluff_count:
logger.debug(
f"Lineage computed with SqlFluff did not perform as expected for query: [{query}]"
)
return lr_sqlparser
return lr_sqlfluff
return lr_sqlparser

View File

@ -22,6 +22,7 @@ from metadata.generated.schema.type.entityLineage import (
LineageDetails,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.lineage.models import Dialect
from metadata.ingestion.lineage.parser import LineageParser
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils import fqn
@ -325,6 +326,7 @@ def get_lineage_by_query(
database_name: Optional[str],
schema_name: Optional[str],
query: str,
dialect: Dialect,
) -> Optional[Iterator[AddLineageRequest]]:
"""
This method parses the query to get source, target and intermediate table names to create lineage,
@ -334,7 +336,7 @@ def get_lineage_by_query(
try:
logger.debug(f"Running lineage with query: {query}")
lineage_parser = LineageParser(query)
lineage_parser = LineageParser(query, dialect)
raw_column_lineage = lineage_parser.column_lineage
column_lineage.update(populate_column_lineage_map(raw_column_lineage))
@ -387,6 +389,7 @@ def get_lineage_via_table_entity(
schema_name: str,
service_name: str,
query: str,
dialect: Dialect,
) -> Optional[Iterator[AddLineageRequest]]:
"""Get lineage from table entity
@ -397,6 +400,7 @@ def get_lineage_via_table_entity(
schema_name (str): name of the schema
service_name (str): name of the service
query (str): query used for lineage
dialect (str): dialect used for lineage
Returns:
Optional[Iterator[AddLineageRequest]]
@ -408,7 +412,7 @@ def get_lineage_via_table_entity(
try:
logger.debug(f"Getting lineage via table entity using query: {query}")
lineage_parser = LineageParser(query)
lineage_parser = LineageParser(query, dialect)
to_table_name = table_entity.name.__root__
for from_table_name in lineage_parser.source_tables:

View File

@ -14,7 +14,7 @@ Python API REST wrapper and helpers
import datetime
import time
import traceback
from typing import Callable, List, Optional, Union
from typing import Callable, Dict, List, Optional, Union
import requests
from requests.exceptions import HTTPError
@ -165,7 +165,7 @@ class REST:
# Example: "Proxy-Authorization": "%(Authorization)s"
# This will result in the Authorization value being set for the Proxy-Authorization Extra Header
if self.config.extra_headers:
extra_headers: dict[str, str] = self.config.extra_headers
extra_headers: Dict[str, str] = self.config.extra_headers
extra_headers = {k: (v % headers) for k, v in extra_headers.items()}
logger.debug("Extra headers provided '%s'", extra_headers)
headers = {**headers, **extra_headers}

View File

@ -23,18 +23,20 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata
from metadata.generated.schema.type.queryParserData import ParsedData, QueryParserData
from metadata.generated.schema.type.tableQuery import TableQueries, TableQuery
from metadata.ingestion.api.processor import Processor, ProcessorStatus
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper, Dialect
from metadata.ingestion.lineage.parser import LineageParser
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
def parse_sql_statement(record: TableQuery) -> Optional[ParsedData]:
def parse_sql_statement(record: TableQuery, dialect: Dialect) -> Optional[ParsedData]:
"""
Use the lineage parser and work with the tokens
to convert a RAW SQL statement into
QueryParserData.
:param record: TableQuery from usage
:param dialect: dialect used to compute lineage
:return: QueryParserData
"""
@ -44,7 +46,7 @@ def parse_sql_statement(record: TableQuery) -> Optional[ParsedData]:
str(record.analysisDate), "%Y-%m-%d %H:%M:%S"
).date()
lineage_parser = LineageParser(record.query)
lineage_parser = LineageParser(record.query, dialect=dialect)
if not lineage_parser.involved_tables:
return None
@ -69,6 +71,7 @@ class QueryParserProcessor(Processor):
Args:
config (QueryParserProcessorConfig):
metadata_config (MetadataServerConfig):
connection_type (str):
Attributes:
config (QueryParserProcessorConfig):
@ -83,18 +86,21 @@ class QueryParserProcessor(Processor):
self,
config: ConfigModel,
metadata_config: OpenMetadataConnection,
connection_type: str,
):
self.config = config
self.metadata_config = metadata_config
self.status = ProcessorStatus()
self.connection_type = connection_type
@classmethod
def create(
cls, config_dict: dict, metadata_config: OpenMetadataConnection, **kwargs
):
config = ConfigModel.parse_obj(config_dict)
return cls(config, metadata_config)
connection_type = kwargs.pop("connection_type", "")
return cls(config, metadata_config, connection_type)
def process( # pylint: disable=arguments-differ
self, queries: TableQueries
@ -103,7 +109,10 @@ class QueryParserProcessor(Processor):
data = []
for record in queries.queries:
try:
parsed_sql = parse_sql_statement(record)
parsed_sql = parse_sql_statement(
record,
ConnectionTypeDialectMapper.dialect_of(self.connection_type),
)
if parsed_sql:
data.append(parsed_sql)
except Exception as exc:

View File

@ -39,6 +39,7 @@ from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper
from metadata.ingestion.lineage.parser import LineageParser
from metadata.ingestion.lineage.sql_lineage import (
get_lineage_by_query,
@ -420,7 +421,9 @@ class CommonDbSourceService(
)
try:
lineage_parser = LineageParser(view_definition)
connection_type = str(self.service_connection.type.value)
dialect = ConnectionTypeDialectMapper.dialect_of(connection_type)
lineage_parser = LineageParser(view_definition, dialect)
if lineage_parser.source_tables and lineage_parser.target_tables:
yield from get_lineage_by_query(
self.metadata,
@ -428,6 +431,7 @@ class CommonDbSourceService(
service_name=self.context.database_service.name.__root__,
database_name=db_name,
schema_name=schema_name,
dialect=dialect,
) or []
else:
@ -438,6 +442,7 @@ class CommonDbSourceService(
database_name=db_name,
schema_name=schema_name,
query=view_definition,
dialect=dialect,
) or []
except Exception as exc:
logger.debug(traceback.format_exc())

View File

@ -61,6 +61,7 @@ from metadata.generated.schema.type.tagLabel import (
TagSource,
)
from metadata.ingestion.api.source import SourceStatus
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper
from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.ometa.ometa_api import OpenMetadata
@ -537,12 +538,17 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
query = (
f"create table {query_fqn} as {data_model_link.datamodel.sql.__root__}"
)
connection_type = str(
self.config.serviceConnection.__root__.config.type.value
)
dialect = ConnectionTypeDialectMapper.dialect_of(connection_type)
lineages = get_lineage_by_query(
self.metadata,
query=query,
service_name=source_elements[0],
database_name=source_elements[1],
schema_name=source_elements[2],
dialect=dialect,
)
for lineage_request in lineages or []:
yield lineage_request

View File

@ -18,6 +18,7 @@ from typing import Iterable, Iterator, Optional
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.type.tableQuery import TableQuery
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper
from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query
from metadata.ingestion.source.connections import get_connection
from metadata.ingestion.source.database.query_parser_source import QueryParserSource
@ -93,6 +94,8 @@ class LineageSource(QueryParserSource, ABC):
Based on the query logs, prepare the lineage
and send it to the sink
"""
connection_type = str(self.service_connection.type.value)
dialect = ConnectionTypeDialectMapper.dialect_of(connection_type)
for table_query in self.get_table_query():
lineages = get_lineage_by_query(
@ -101,6 +104,7 @@ class LineageSource(QueryParserSource, ABC):
service_name=table_query.serviceName,
database_name=table_query.databaseName,
schema_name=table_query.databaseSchema,
dialect=dialect,
)
for lineage_request in lineages or []:

View File

@ -14,6 +14,7 @@ Postgres lineage module
from typing import Iterable
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.ingestion.lineage.models import Dialect
from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query
from metadata.ingestion.source.database.lineage_source import LineageSource
from metadata.ingestion.source.database.postgres.queries import POSTGRES_SQL_STATEMENT
@ -52,6 +53,7 @@ class PostgresLineageSource(PostgresQueryParserSource, LineageSource):
service_name=table_query.serviceName,
database_name=table_query.databaseName,
schema_name=table_query.databaseSchema,
dialect=Dialect.POSTGRES,
)
for lineage_request in lineages or []:

View File

@ -15,8 +15,6 @@ sql lineage utils tests
import uuid
from unittest import TestCase
from sqllineage.core.models import Column
from metadata.generated.schema.entity.data.table import Table
from metadata.ingestion.lineage.parser import LineageParser
from metadata.ingestion.lineage.sql_lineage import (