Improve filtering for lineage query (#11457)

This commit is contained in:
Mayur Singal 2023-05-08 15:18:51 +05:30 committed by GitHub
parent 9d61d63ce4
commit a897954afc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 38 additions and 12 deletions

View File

@ -27,5 +27,8 @@ class BigqueryLineageSource(BigqueryQueryParserSource, LineageSource):
sql_stmt = BIGQUERY_STATEMENT sql_stmt = BIGQUERY_STATEMENT
filters = """ filters = """
AND statement_type IN ("INSERT", "MERGE", "CREATE_TABLE_AS_SELECT", "UPDATE") AND (
statement_type IN ("MERGE", "CREATE_TABLE_AS_SELECT", "UPDATE")
OR (statement_type = "INSERT" and UPPER(query) like '%%INSERT%%INTO%%SELECT%%')
)
""" """

View File

@ -29,7 +29,10 @@ class ClickhouseLineageSource(ClickhouseQueryParserSource, LineageSource):
sql_stmt = CLICKHOUSE_SQL_STATEMENT sql_stmt = CLICKHOUSE_SQL_STATEMENT
filters = """ filters = """
and query_kind in ('Create', 'Insert') and (
query_kind='Create'
or (query_kind='Insert' and query ilike '%%insert%%into%%select%%')
)
""" """
database_field = "" database_field = ""

View File

@ -19,4 +19,11 @@ from metadata.ingestion.source.database.mssql.query_parser import MssqlQueryPars
class MssqlLineageSource(MssqlQueryParserSource, LineageSource): class MssqlLineageSource(MssqlQueryParserSource, LineageSource):
sql_stmt = MSSQL_SQL_STATEMENT sql_stmt = MSSQL_SQL_STATEMENT
filters = "" # No filtering in the queries filters = """
AND (
lower(t.text) LIKE '%%select%%into%%'
OR lower(t.text) LIKE '%%insert%%into%%select%%'
OR lower(t.text) LIKE '%%update%%'
OR lower(t.text) LIKE '%%merge%%'
)
"""

View File

@ -34,7 +34,9 @@ class PostgresLineageSource(PostgresQueryParserSource, LineageSource):
filters = """ filters = """
AND ( AND (
s.query ILIKE '%%create table%%as%%select%%' s.query ILIKE '%%create table%%as%%select%%'
OR s.query ILIKE '%%insert%%' OR s.query ILIKE '%%insert%%into%%select%%'
OR s.query ILIKE '%%update%%'
OR s.query ILIKE '%%merge%%'
) )
""" """

View File

@ -40,8 +40,10 @@ from metadata.ingestion.source.database.redshift.query_parser import (
class RedshiftLineageSource(RedshiftQueryParserSource, LineageSource): class RedshiftLineageSource(RedshiftQueryParserSource, LineageSource):
filters = """ filters = """
AND ( AND (
querytxt ILIKE '%%create table%%as%%select%%' querytxt ILIKE '%%create%%table%%as%%select%%'
OR querytxt ILIKE '%%insert%%' OR querytxt ILIKE '%%insert%%into%%select%%'
OR querytxt ILIKE '%%update%%'
OR querytxt ILIKE '%%merge%%'
) )
""" """

View File

@ -20,10 +20,10 @@ from metadata.ingestion.source.database.usage_source import UsageSource
class RedshiftUsageSource(RedshiftQueryParserSource, UsageSource): class RedshiftUsageSource(RedshiftQueryParserSource, UsageSource):
filters = """ filters = """
AND querytxt NOT ILIKE 'fetch %%' AND querytxt NOT ILIKE 'fetch%%'
AND querytxt NOT ILIKE 'padb_fetch_sample: %%' AND querytxt NOT ILIKE 'padb_fetch_sample:%%'
AND querytxt NOT ILIKE 'Undoing %% transactions on table %% with current xid%%' AND querytxt NOT ILIKE 'Undoing%%transactions%%on%%table%%with%%current%%xid%%'
AND querytxt NOT ILIKE '%%create table%%as%%select%%' AND querytxt NOT ILIKE '%%create%%table%%as%%select%%'
AND querytxt NOT ILIKE '%%insert%%' AND querytxt NOT ILIKE '%%insert%%'
""" """

View File

@ -27,5 +27,8 @@ class SnowflakeLineageSource(SnowflakeQueryParserSource, LineageSource):
sql_stmt = SNOWFLAKE_SQL_STATEMENT sql_stmt = SNOWFLAKE_SQL_STATEMENT
filters = """ filters = """
AND QUERY_TYPE IN ('INSERT', 'MERGE', 'UPDATE','CREATE_TABLE_AS_SELECT') AND (
QUERY_TYPE IN ('MERGE', 'UPDATE','CREATE_TABLE_AS_SELECT')
OR (QUERY_TYPE = 'INSERT' and query_text ILIKE '%%insert%%into%%select%%')
)
""" """

View File

@ -24,7 +24,13 @@ logger = ingestion_logger()
class VerticaLineageSource(VerticaQueryParserSource, LineageSource): class VerticaLineageSource(VerticaQueryParserSource, LineageSource):
sql_stmt = VERTICA_SQL_STATEMENT sql_stmt = VERTICA_SQL_STATEMENT
filters = "AND query_type in ('INSERT', 'UPDATE', 'QUERY', 'DDL')" filters = """
AND (
query_type in ('UPDATE', 'DDL')
OR ( query_type IN ('INSERT','QUERY') and p.query ilike '%%INSERT%%INTO%%SELECT%%')
OR ( query_type = 'QUERY' and p.query not ilike '%%INSERT%%INTO%%')
)
"""
database_field = "DBNAME()" database_field = "DBNAME()"