Fix #10292: Remove DB & Schema Usage Filters (#10378)

* Fix #10292: Remove DB & Schema Usage Filters

* Fix Lint

* lint
This commit is contained in:
Mayur Singal 2023-03-03 11:09:55 +05:30 committed by GitHub
parent 6e6722b5e1
commit 92fee5cf4f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 7 additions and 187 deletions

View File

@ -29,7 +29,3 @@ class BigqueryLineageSource(BigqueryQueryParserSource, LineageSource):
filters = """
AND statement_type IN ("INSERT", "MERGE", "CREATE_TABLE_AS_SELECT", "UPDATE")
"""
database_field = "project_id"
schema_field = "" # schema filtering not available

View File

@ -58,7 +58,7 @@ class BigqueryQueryParserSource(QueryParserSource, ABC):
start_time=start_time,
end_time=end_time,
region=self.service_connection.usageLocation,
filters=self.filters, # pylint: disable=no-member
filters=self.filters,
result_limit=self.source_config.resultLimit,
)

View File

@ -29,7 +29,3 @@ class BigqueryUsageSource(BigqueryQueryParserSource, UsageSource):
filters = """
AND statement_type = "SELECT"
"""
database_field = "project_id"
schema_field = "" # schema filtering not available

View File

@ -21,7 +21,3 @@ class MssqlLineageSource(MssqlQueryParserSource, LineageSource):
sql_stmt = MSSQL_SQL_STATEMENT
filters = "" # No filtering in the queries
database_field = "db.NAME"
schema_field = "" # schema filtering not available

View File

@ -21,7 +21,3 @@ class MssqlUsageSource(MssqlQueryParserSource, UsageSource):
sql_stmt = MSSQL_SQL_STATEMENT
filters = "" # No filtering in the queries
database_field = "db.NAME"
schema_field = "" # schema filtering not available

View File

@ -37,8 +37,6 @@ class PostgresLineageSource(PostgresQueryParserSource, LineageSource):
OR s.query ILIKE '%%insert%%'
)
"""
database_field = "d.datname"
schema_field = "" # schema filtering not available
def next_record(self) -> Iterable[AddLineageRequest]:
"""

View File

@ -70,7 +70,7 @@ class PostgresQueryParserSource(QueryParserSource, ABC):
"""
return self.sql_stmt.format(
result_limit=self.config.sourceConfig.config.resultLimit,
filters=self.filters, # pylint: disable=no-member
filters=self.filters,
)
def get_table_query(self) -> Iterable[TableQuery]:

View File

@ -25,5 +25,3 @@ class PostgresUsageSource(PostgresQueryParserSource, UsageSource):
sql_stmt = POSTGRES_SQL_STATEMENT
filters = ""
database_field = "d.datname"
schema_field = "" # schema filtering not available

View File

@ -13,10 +13,9 @@ Usage Souce Module
"""
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Iterator, List, Optional, Union
from typing import Iterator, Optional, Union
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
@ -61,31 +60,8 @@ class QueryParserSource(Source[Union[TableQuery, AddLineageRequest]], ABC):
def prepare(self):
"""
Fetch queries only from DB that is ingested in OM
By default, there's nothing to prepare
"""
databases: List[Database] = self.metadata.list_all_entities(
Database, ["databaseSchemas"], params={"service": self.config.serviceName}
)
database_name_list = []
schema_name_list = []
for database in databases:
database_name_list.append(database.name.__root__)
if self.schema_field and database.databaseSchemas:
for schema in database.databaseSchemas.__root__:
schema_name_list.append(schema.name)
if self.database_field and database_name_list:
self.filters += ( # pylint: disable=no-member
f" AND {self.database_field} IN ('"
+ "','".join(database_name_list)
+ "')"
)
if self.schema_field and schema_name_list:
self.filters += ( # pylint: disable=no-member
f" AND {self.schema_field} IN ('" + "','".join(schema_name_list) + "')"
)
@abstractmethod
def get_table_query(self) -> Optional[Iterator[TableQuery]]:
@ -123,7 +99,7 @@ class QueryParserSource(Source[Union[TableQuery, AddLineageRequest]], ABC):
return self.sql_stmt.format(
start_time=start_time,
end_time=end_time,
filters=self.filters, # pylint: disable=no-member
filters=self.filters,
result_limit=self.source_config.resultLimit,
)

View File

@ -47,9 +47,3 @@ class RedshiftLineageSource(RedshiftQueryParserSource, LineageSource):
"""
sql_stmt = REDSHIFT_SQL_STATEMENT
database_field = "database_name"
schema_field = "schema_name"
db_filters = ""

View File

@ -79,8 +79,6 @@ REDSHIFT_SQL_STATEMENT = textwrap.dedent(
ON s.query = fq.query
INNER JOIN pg_catalog.pg_user AS u
ON q.userid = u.usesysid
WHERE
{db_filters}
ORDER BY q.endtime DESC
"""
)

View File

@ -13,9 +13,7 @@ Redshift usage module
"""
from abc import ABC
from datetime import datetime
from typing import List
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.services.connections.database.redshiftConnection import (
RedshiftConnection,
)
@ -38,7 +36,6 @@ class RedshiftQueryParserSource(QueryParserSource, ABC):
"""
filters: str
db_filters: str
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
@ -50,32 +47,6 @@ class RedshiftQueryParserSource(QueryParserSource, ABC):
)
return cls(config, metadata_config)
def prepare(self):
"""
Fetch queries only from DB that is ingested in OM
"""
databases: List[Database] = self.metadata.list_all_entities(
Database, ["databaseSchemas"], params={"service": self.config.serviceName}
)
database_name_list = []
schema_name_list = []
for database in databases:
database_name_list.append(database.name.__root__)
if self.schema_field and database.databaseSchemas:
for schema in database.databaseSchemas.__root__:
schema_name_list.append(schema.name)
if self.database_field and database_name_list:
self.db_filters += ( # pylint: disable=no-member
f"{self.database_field} IN ('" + "','".join(database_name_list) + "')"
)
if self.schema_field and schema_name_list:
self.db_filters += ( # pylint: disable=no-member
f" AND {self.schema_field} IN ('" + "','".join(schema_name_list) + "')"
)
def get_sql_statement(self, start_time: datetime, end_time: datetime) -> str:
"""
returns sql statement to fetch query logs
@ -83,7 +54,6 @@ class RedshiftQueryParserSource(QueryParserSource, ABC):
return self.sql_stmt.format(
start_time=start_time,
end_time=end_time,
filters=self.filters, # pylint: disable=no-member
db_filters=self.db_filters, # pylint: disable=no-member
filters=self.filters,
result_limit=self.source_config.resultLimit,
)

View File

@ -29,9 +29,3 @@ class RedshiftUsageSource(RedshiftQueryParserSource, UsageSource):
"""
sql_stmt = REDSHIFT_SQL_STATEMENT
database_field = "database_name"
schema_field = "schema_name"
db_filters = ""

View File

@ -29,7 +29,3 @@ class SnowflakeLineageSource(SnowflakeQueryParserSource, LineageSource):
filters = """
AND QUERY_TYPE IN ('INSERT', 'MERGE', 'UPDATE','CREATE_TABLE_AS_SELECT')
"""
database_field = "database_name"
schema_field = "schema_name"

View File

@ -67,7 +67,7 @@ class SnowflakeQueryParserSource(QueryParserSource, ABC):
start_time=start_time,
end_time=end_time,
result_limit=self.config.sourceConfig.config.resultLimit,
filters=self.filters, # pylint: disable=no-member
filters=self.filters,
)
def set_session_query_tag(self) -> None:

View File

@ -32,7 +32,3 @@ class SnowflakeUsageSource(SnowflakeQueryParserSource, UsageSource):
ALTER_SESSION','COPY','UPDATE','COMMIT','SHOW','ALTER','DESCRIBE','CREATE_TABLE','PUT_FILES','GET_FILES',
'INSERT', 'MERGE', 'CREATE_TABLE_AS_SELECT')
"""
database_field = "database_name"
schema_field = "schema_name"

View File

@ -30,68 +30,9 @@ from metadata.generated.schema.type.entityReference import (
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.clickhouse.usage import ClickhouseUsageSource
from metadata.ingestion.source.database.mssql.usage import MssqlUsageSource
from metadata.ingestion.source.database.redshift.usage import RedshiftUsageSource
T = TypeVar("T", bound=BaseModel)
mock_mssql_config = {
"source": {
"type": "mssql-usage",
"serviceName": "local_mssql",
"serviceConnection": {
"config": {
"type": "Mssql",
"username": "usernames",
"password": "password",
"hostPort": "localhost:1433",
}
},
"sourceConfig": {"config": {"queryLogDuration": "1"}},
},
"stage": {"type": "table-usage", "config": {"filename": "/tmp/mssql_usage"}},
"bulkSink": {"type": "metadata-usage", "config": {"filename": "/tmp/mssql_usage"}},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {
"jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
},
}
},
}
mock_redshift_config = {
"source": {
"type": "redshift-usage",
"serviceName": "local_redshift",
"serviceConnection": {
"config": {
"hostPort": "localhost:5439",
"username": "usernames",
"password": "password",
"type": "Redshift",
"database": "dev",
}
},
"sourceConfig": {"config": {"queryLogDuration": "1"}},
},
"stage": {"type": "table-usage", "config": {"filename": "/tmp/mssql_usage"}},
"bulkSink": {"type": "metadata-usage", "config": {"filename": "/tmp/mssql_usage"}},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {
"jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
},
}
},
}
mock_clickhouse_config = {
"source": {
"type": "clickhouse-usage",
@ -195,11 +136,6 @@ def mock_list_entities(
]
EXPECTED_MSSQL_FILTER = " AND db.NAME IN ('test_db_1','test_db_2')"
EXPECTED_REDSHIFT_FILTER = """database_name IN ('test_db_1','test_db_2') AND schema_name IN ('test_schema_1','test_schema_2','test_schema_3','test_schema_4','test_schema_5','test_schema_6')"""
EXPECTED_CLICKHOUSE_FILTER = """
and query_kind = 'Select'
AND hasAny(databases, ['test_schema_1','test_schema_2','test_schema_3','test_schema_4','test_schema_5','test_schema_6'])"""
@ -210,26 +146,6 @@ class UsageQueryFilterTests(TestCase):
Usage filter tests for database and schema filters
"""
@patch.object(OpenMetadata, "list_all_entities", mock_list_entities)
def test_prepare_mssql(self):
config = OpenMetadataWorkflowConfig.parse_obj(mock_mssql_config)
mssql_source = MssqlUsageSource.create(
mock_mssql_config["source"],
config.workflowConfig.openMetadataServerConfig,
)
mssql_source.prepare()
assert mssql_source.filters == EXPECTED_MSSQL_FILTER
@patch.object(OpenMetadata, "list_all_entities", mock_list_entities)
def test_prepare_redshift(self):
config = OpenMetadataWorkflowConfig.parse_obj(mock_redshift_config)
redshift_source = RedshiftUsageSource.create(
mock_redshift_config["source"],
config.workflowConfig.openMetadataServerConfig,
)
redshift_source.prepare()
assert redshift_source.db_filters == EXPECTED_REDSHIFT_FILTER
@patch.object(OpenMetadata, "list_all_entities", mock_list_entities)
def test_prepare_clickhouse(self):
config = OpenMetadataWorkflowConfig.parse_obj(mock_clickhouse_config)