Fix #6829: Improve usage filtering (#9613)

This commit is contained in:
Mayur Singal 2023-01-05 20:37:03 +05:30 committed by GitHub
parent 485e4986b9
commit 9ef32eef99
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 390 additions and 20 deletions

View File

@ -29,3 +29,7 @@ class BigqueryLineageSource(BigqueryQueryParserSource, LineageSource):
filters = """
AND statement_type IN ("INSERT", "MERGE", "CREATE_TABLE_AS_SELECT", "UPDATE")
"""
database_field = "project_id"
schema_field = "" # schema filtering not available

View File

@ -58,7 +58,7 @@ class BigqueryQueryParserSource(QueryParserSource, ABC):
start_time=start_time,
end_time=end_time,
region=self.service_connection.usageLocation,
filters=self.filters,
filters=self.filters, # pylint: disable=no-member
result_limit=self.source_config.resultLimit,
)

View File

@ -29,3 +29,7 @@ class BigqueryUsageSource(BigqueryQueryParserSource, UsageSource):
filters = """
AND statement_type = "SELECT"
"""
database_field = "project_id"
schema_field = "" # schema filtering not available

View File

@ -31,3 +31,7 @@ class ClickhouseLineageSource(ClickhouseQueryParserSource, LineageSource):
filters = """
and query_kind in ('Create', 'Insert')
"""
database_field = ""
schema_field = "databases"

View File

@ -15,7 +15,9 @@ Clickhouse usage module
import ast
from abc import ABC
from datetime import datetime
from typing import List
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.services.connections.database.clickhouseConnection import (
ClickhouseConnection,
)
@ -62,6 +64,29 @@ class ClickhouseQueryParserSource(QueryParserSource, ABC):
return self.sql_stmt.format(
start_time=start_time,
end_time=end_time,
filters=self.filters,
filters=self.filters, # pylint: disable=no-member
result_limit=self.source_config.resultLimit,
)
def prepare(self):
"""
Fetch queries only from DB that is ingested in OM
"""
databases: List[Database] = self.metadata.list_all_entities(
Database, ["databaseSchemas"], params={"service": self.config.serviceName}
)
database_name_list = []
schema_name_list = []
for database in databases:
database_name_list.append(database.name.__root__)
if self.schema_field and database.databaseSchemas:
for schema in database.databaseSchemas.__root__:
schema_name_list.append(schema.name)
if self.schema_field and schema_name_list:
self.filters += ( # pylint: disable=no-member
f" AND hasAny({self.schema_field}, ['"
+ "','".join(schema_name_list)
+ "'])"
)

View File

@ -31,3 +31,7 @@ class ClickhouseUsageSource(ClickhouseQueryParserSource, UsageSource):
filters = """
and query_kind = 'Select'
"""
database_field = ""
schema_field = "databases"

View File

@ -62,3 +62,8 @@ class DatabricksQueryParserSource(QueryParserSource, ABC):
f"Expected DatabricksConnection, but got {connection}"
)
return cls(config, metadata_config)
def prepare(self):
"""
By default, there's nothing to prepare
"""

View File

@ -21,3 +21,7 @@ class MssqlLineageSource(MssqlQueryParserSource, LineageSource):
sql_stmt = MSSQL_SQL_STATEMENT
filters = "" # No filtering in the queries
database_field = "db.NAME"
schema_field = "" # schema filtering not available

View File

@ -21,3 +21,7 @@ class MssqlUsageSource(MssqlQueryParserSource, UsageSource):
sql_stmt = MSSQL_SQL_STATEMENT
filters = "" # No filtering in the queries
database_field = "db.NAME"
schema_field = "" # schema filtering not available

View File

@ -36,6 +36,8 @@ class PostgresLineageSource(PostgresQueryParserSource, LineageSource):
OR s.query ILIKE '%%insert%%'
)
"""
database_field = "d.datname"
schema_field = "" # schema filtering not available
def next_record(self) -> Iterable[AddLineageRequest]:
"""

View File

@ -70,7 +70,7 @@ class PostgresQueryParserSource(QueryParserSource, ABC):
"""
return self.sql_stmt.format(
result_limit=self.config.sourceConfig.config.resultLimit,
filters=self.filters,
filters=self.filters, # pylint: disable=no-member
)
def get_table_query(self) -> Iterable[TableQuery]:

View File

@ -25,3 +25,5 @@ class PostgresUsageSource(PostgresQueryParserSource, UsageSource):
sql_stmt = POSTGRES_SQL_STATEMENT
filters = ""
database_field = "d.datname"
schema_field = "" # schema filtering not available

View File

@ -13,9 +13,10 @@ Usage Souce Module
"""
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Iterator, Optional, Union
from typing import Iterator, List, Optional, Union
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
@ -45,6 +46,8 @@ class QueryParserSource(Source[Union[TableQuery, AddLineageRequest]], ABC):
sql_stmt: str
filters: str
database_field: str
schema_field: str
def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection):
self.config = config
@ -58,8 +61,31 @@ class QueryParserSource(Source[Union[TableQuery, AddLineageRequest]], ABC):
def prepare(self):
"""
By default, there's nothing to prepare
Fetch queries only from DB that is ingested in OM
"""
databases: List[Database] = self.metadata.list_all_entities(
Database, ["databaseSchemas"], params={"service": self.config.serviceName}
)
database_name_list = []
schema_name_list = []
for database in databases:
database_name_list.append(database.name.__root__)
if self.schema_field and database.databaseSchemas:
for schema in database.databaseSchemas.__root__:
schema_name_list.append(schema.name)
if self.database_field and database_name_list:
self.filters += ( # pylint: disable=no-member
f" AND {self.database_field} IN ('"
+ "','".join(database_name_list)
+ "')"
)
if self.schema_field and schema_name_list:
self.filters += ( # pylint: disable=no-member
f" AND {self.schema_field} IN ('" + "','".join(schema_name_list) + "')"
)
@abstractmethod
def get_table_query(self) -> Optional[Iterator[TableQuery]]:
@ -97,7 +123,7 @@ class QueryParserSource(Source[Union[TableQuery, AddLineageRequest]], ABC):
return self.sql_stmt.format(
start_time=start_time,
end_time=end_time,
filters=self.filters,
filters=self.filters, # pylint: disable=no-member
result_limit=self.source_config.resultLimit,
)

View File

@ -47,3 +47,9 @@ class RedshiftLineageSource(RedshiftQueryParserSource, LineageSource):
"""
sql_stmt = REDSHIFT_SQL_STATEMENT
database_field = "database_name"
schema_field = "schema_name"
db_filters = ""

View File

@ -72,6 +72,8 @@ REDSHIFT_SQL_STATEMENT = textwrap.dedent(
ON s.query = fq.query
INNER JOIN pg_catalog.pg_user AS u
ON q.userid = u.usesysid
WHERE
{db_filters}
ORDER BY q.endtime DESC
"""
)

View File

@ -13,7 +13,9 @@ Redshift usage module
"""
from abc import ABC
from datetime import datetime
from typing import List
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.services.connections.database.redshiftConnection import (
RedshiftConnection,
)
@ -36,6 +38,7 @@ class RedshiftQueryParserSource(QueryParserSource, ABC):
"""
filters: str
db_filters: str
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
@ -47,6 +50,32 @@ class RedshiftQueryParserSource(QueryParserSource, ABC):
)
return cls(config, metadata_config)
def prepare(self):
"""
Fetch queries only from DB that is ingested in OM
"""
databases: List[Database] = self.metadata.list_all_entities(
Database, ["databaseSchemas"], params={"service": self.config.serviceName}
)
database_name_list = []
schema_name_list = []
for database in databases:
database_name_list.append(database.name.__root__)
if self.schema_field and database.databaseSchemas:
for schema in database.databaseSchemas.__root__:
schema_name_list.append(schema.name)
if self.database_field and database_name_list:
self.db_filters += ( # pylint: disable=no-member
f"{self.database_field} IN ('" + "','".join(database_name_list) + "')"
)
if self.schema_field and schema_name_list:
self.db_filters += ( # pylint: disable=no-member
f" AND {self.schema_field} IN ('" + "','".join(schema_name_list) + "')"
)
def get_sql_statement(self, start_time: datetime, end_time: datetime) -> str:
"""
returns sql statement to fetch query logs
@ -54,6 +83,7 @@ class RedshiftQueryParserSource(QueryParserSource, ABC):
return self.sql_stmt.format(
start_time=start_time,
end_time=end_time,
filters=self.filters,
filters=self.filters, # pylint: disable=no-member
db_filters=self.db_filters, # pylint: disable=no-member
result_limit=self.source_config.resultLimit,
)

View File

@ -29,3 +29,9 @@ class RedshiftUsageSource(RedshiftQueryParserSource, UsageSource):
"""
sql_stmt = REDSHIFT_SQL_STATEMENT
database_field = "database_name"
schema_field = "schema_name"
db_filters = ""

View File

@ -45,6 +45,10 @@ class SampleUsageSource(UsageSource):
service_type = DatabaseServiceType.BigQuery.value
database_field = "" # filtering not required
schema_field = "" # filtering not required
def __init__(
self, config: WorkflowSource, metadata_config: OpenMetadataConnection
): # pylint: disable=super-init-not-called

View File

@ -29,3 +29,7 @@ class SnowflakeLineageSource(SnowflakeQueryParserSource, LineageSource):
filters = """
AND QUERY_TYPE IN ('INSERT', 'MERGE', 'UPDATE','CREATE_TABLE_AS_SELECT')
"""
database_field = "database_name"
schema_field = "schema_name"

View File

@ -67,7 +67,7 @@ class SnowflakeQueryParserSource(QueryParserSource, ABC):
start_time=start_time,
end_time=end_time,
result_limit=self.config.sourceConfig.config.resultLimit,
filters=self.filters,
filters=self.filters, # pylint: disable=no-member
)
def set_session_query_tag(self) -> None:

View File

@ -32,3 +32,7 @@ class SnowflakeUsageSource(SnowflakeQueryParserSource, UsageSource):
ALTER_SESSION','COPY','UPDATE','COMMIT','SHOW','ALTER','DESCRIBE','CREATE_TABLE','PUT_FILES','GET_FILES',
'INSERT', 'MERGE', 'CREATE_TABLE_AS_SELECT')
"""
database_field = "database_name"
schema_field = "schema_name"

View File

@ -274,6 +274,7 @@ class SqlColumnHandlerMixin:
return col_raw_type.length if col_raw_type.length else 1
except AttributeError:
return 1
return 1
return None
@staticmethod

View File

@ -0,0 +1,241 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=line-too-long,unused-argument
"""
Usage query database and schema filter tests
"""
from typing import Dict, List, Optional, Type, TypeVar
from unittest import TestCase
from unittest.mock import patch
from pydantic import BaseModel
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.type.entityReference import (
EntityReference,
EntityReferenceList,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.clickhouse.usage import ClickhouseUsageSource
from metadata.ingestion.source.database.mssql.usage import MssqlUsageSource
from metadata.ingestion.source.database.redshift.usage import RedshiftUsageSource
T = TypeVar("T", bound=BaseModel)
mock_mssql_config = {
"source": {
"type": "mssql-usage",
"serviceName": "local_mssql",
"serviceConnection": {
"config": {
"type": "Mssql",
"username": "usernames",
"password": "password",
"hostPort": "localhost:1433",
}
},
"sourceConfig": {"config": {"queryLogDuration": "1"}},
},
"stage": {"type": "table-usage", "config": {"filename": "/tmp/mssql_usage"}},
"bulkSink": {"type": "metadata-usage", "config": {"filename": "/tmp/mssql_usage"}},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {
"jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
},
}
},
}
mock_redshift_config = {
"source": {
"type": "redshift-usage",
"serviceName": "local_redshift",
"serviceConnection": {
"config": {
"hostPort": "localhost:5439",
"username": "usernames",
"password": "password",
"type": "Redshift",
"database": "dev",
}
},
"sourceConfig": {"config": {"queryLogDuration": "1"}},
},
"stage": {"type": "table-usage", "config": {"filename": "/tmp/mssql_usage"}},
"bulkSink": {"type": "metadata-usage", "config": {"filename": "/tmp/mssql_usage"}},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {
"jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
},
}
},
}
mock_clickhouse_config = {
"source": {
"type": "clickhouse-usage",
"serviceName": "local_clickhouse",
"serviceConnection": {
"config": {
"hostPort": "localhost:8123",
"username": "usernames",
"password": "password",
"type": "Clickhouse",
"databaseSchema": "default",
}
},
"sourceConfig": {"config": {"queryLogDuration": "1"}},
},
"stage": {"type": "table-usage", "config": {"filename": "/tmp/mssql_usage"}},
"bulkSink": {"type": "metadata-usage", "config": {"filename": "/tmp/mssql_usage"}},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {
"jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
},
}
},
}
def mock_list_entities(
self,
entity: Type[T],
fields: Optional[List[str]] = None,
limit: int = 1000,
params: Optional[Dict[str, str]] = None,
):
"""
mock list entities for databases
"""
schema_list1 = EntityReferenceList(
__root__=[
EntityReference(
id="73129df3-96ed-476d-a9b5-b92091264649",
name="test_schema_1",
type="databaseSchema",
),
EntityReference(
id="73129df3-96ed-476d-a9b5-b92091264649",
name="test_schema_2",
type="databaseSchema",
),
EntityReference(
id="73129df3-96ed-476d-a9b5-b92091264649",
name="test_schema_3",
type="databaseSchema",
),
]
)
schema_list2 = EntityReferenceList(
__root__=[
EntityReference(
id="73129df3-96ed-476d-a9b5-b92091264649",
name="test_schema_4",
type="databaseSchema",
),
EntityReference(
id="73129df3-96ed-476d-a9b5-b92091264649",
name="test_schema_5",
type="databaseSchema",
),
EntityReference(
id="73129df3-96ed-476d-a9b5-b92091264649",
name="test_schema_6",
type="databaseSchema",
),
]
)
return [
Database(
id="6a9b1ade-dff1-4a14-89f9-7842f1e06270",
name="test_db_1",
service=EntityReference(
id="73129df3-96ed-476d-a9b5-b92091264649",
name="test_db_service",
type="databaseService",
),
databaseSchemas=schema_list1,
),
Database(
id="6a9b1ade-dff1-4a14-89f9-7842f1e06270",
name="test_db_2",
service=EntityReference(
id="73129df3-96ed-476d-a9b5-b92091264649",
name="test_db_service",
type="databaseService",
),
databaseSchemas=schema_list2,
),
]
EXPECTED_MSSQL_FILTER = " AND db.NAME IN ('test_db_1','test_db_2')"
EXPECTED_REDSHIFT_FILTER = """database_name IN ('test_db_1','test_db_2') AND schema_name IN ('test_schema_1','test_schema_2','test_schema_3','test_schema_4','test_schema_5','test_schema_6')"""
EXPECTED_CLICKHOUSE_FILTER = """
and query_kind = 'Select'
AND hasAny(databases, ['test_schema_1','test_schema_2','test_schema_3','test_schema_4','test_schema_5','test_schema_6'])"""
class UsageQueryFilterTests(TestCase):
"""
Usage filter tests for database and schema filters
"""
@patch.object(OpenMetadata, "list_all_entities", mock_list_entities)
def test_prepare_mssql(self):
config = OpenMetadataWorkflowConfig.parse_obj(mock_mssql_config)
mssql_source = MssqlUsageSource.create(
mock_mssql_config["source"],
config.workflowConfig.openMetadataServerConfig,
)
mssql_source.prepare()
assert mssql_source.filters == EXPECTED_MSSQL_FILTER
@patch.object(OpenMetadata, "list_all_entities", mock_list_entities)
def test_prepare_redshift(self):
config = OpenMetadataWorkflowConfig.parse_obj(mock_redshift_config)
redshift_source = RedshiftUsageSource.create(
mock_redshift_config["source"],
config.workflowConfig.openMetadataServerConfig,
)
redshift_source.prepare()
assert redshift_source.db_filters == EXPECTED_REDSHIFT_FILTER
@patch.object(OpenMetadata, "list_all_entities", mock_list_entities)
def test_prepare_clickhouse(self):
config = OpenMetadataWorkflowConfig.parse_obj(mock_clickhouse_config)
clickhouse_source = ClickhouseUsageSource.create(
mock_clickhouse_config["source"],
config.workflowConfig.openMetadataServerConfig,
)
clickhouse_source.prepare()
assert clickhouse_source.filters == EXPECTED_CLICKHOUSE_FILTER

View File

@ -35,18 +35,6 @@
"queryLogFilePath": {
"description": "Configuration to set the file path for query logs",
"type": "string"
},
"schemaFilterPattern": {
"description": "Regex to only fetch tables or databases that matches the pattern.",
"$ref": "../type/filterPattern.json#/definitions/filterPattern"
},
"tableFilterPattern": {
"description": "Regex exclude tables or databases that matches the pattern.",
"$ref": "../type/filterPattern.json#/definitions/filterPattern"
},
"databaseFilterPattern": {
"description": "Regex to only fetch databases that matches the pattern.",
"$ref": "../type/filterPattern.json#/definitions/filterPattern"
}
},
"additionalProperties": false