diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery/lineage.py b/ingestion/src/metadata/ingestion/source/database/bigquery/lineage.py index 51fd0eff82e..bdd2c345576 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery/lineage.py @@ -29,3 +29,7 @@ class BigqueryLineageSource(BigqueryQueryParserSource, LineageSource): filters = """ AND statement_type IN ("INSERT", "MERGE", "CREATE_TABLE_AS_SELECT", "UPDATE") """ + + database_field = "project_id" + + schema_field = "" # schema filtering not available diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery/query_parser.py b/ingestion/src/metadata/ingestion/source/database/bigquery/query_parser.py index eb883ea1854..2ff6c84ee1e 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery/query_parser.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery/query_parser.py @@ -58,7 +58,7 @@ class BigqueryQueryParserSource(QueryParserSource, ABC): start_time=start_time, end_time=end_time, region=self.service_connection.usageLocation, - filters=self.filters, + filters=self.filters, # pylint: disable=no-member result_limit=self.source_config.resultLimit, ) diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery/usage.py b/ingestion/src/metadata/ingestion/source/database/bigquery/usage.py index 7729f7a1b7b..2da25210bb2 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery/usage.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery/usage.py @@ -29,3 +29,7 @@ class BigqueryUsageSource(BigqueryQueryParserSource, UsageSource): filters = """ AND statement_type = "SELECT" """ + + database_field = "project_id" + + schema_field = "" # schema filtering not available diff --git a/ingestion/src/metadata/ingestion/source/database/clickhouse/lineage.py b/ingestion/src/metadata/ingestion/source/database/clickhouse/lineage.py index fcc2fff41aa..1bca5462e5b 100644 --- a/ingestion/src/metadata/ingestion/source/database/clickhouse/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/clickhouse/lineage.py @@ -31,3 +31,7 @@ class ClickhouseLineageSource(ClickhouseQueryParserSource, LineageSource): filters = """ and query_kind in ('Create', 'Insert') """ + + database_field = "" + + schema_field = "databases" diff --git a/ingestion/src/metadata/ingestion/source/database/clickhouse/query_parser.py b/ingestion/src/metadata/ingestion/source/database/clickhouse/query_parser.py index c441ed7ca40..db4c526cf5a 100644 --- a/ingestion/src/metadata/ingestion/source/database/clickhouse/query_parser.py +++ b/ingestion/src/metadata/ingestion/source/database/clickhouse/query_parser.py @@ -15,7 +15,9 @@ Clickhouse usage module import ast from abc import ABC from datetime import datetime +from typing import List +from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.services.connections.database.clickhouseConnection import ( ClickhouseConnection, ) @@ -62,6 +64,29 @@ class ClickhouseQueryParserSource(QueryParserSource, ABC): return self.sql_stmt.format( start_time=start_time, end_time=end_time, - filters=self.filters, + filters=self.filters, # pylint: disable=no-member result_limit=self.source_config.resultLimit, ) + + def prepare(self): + """ + Fetch queries only from DB that is ingested in OM + """ + databases: List[Database] = self.metadata.list_all_entities( + Database, ["databaseSchemas"], params={"service": self.config.serviceName} + ) + database_name_list = [] + schema_name_list = [] + + for database in databases: + database_name_list.append(database.name.__root__) + if self.schema_field and database.databaseSchemas: + for schema in database.databaseSchemas.__root__: + schema_name_list.append(schema.name) + + if self.schema_field and schema_name_list: + self.filters += ( # pylint: disable=no-member + f" AND hasAny({self.schema_field}, ['" + + "','".join(schema_name_list) + + "'])" + ) diff --git a/ingestion/src/metadata/ingestion/source/database/clickhouse/usage.py b/ingestion/src/metadata/ingestion/source/database/clickhouse/usage.py index 38270fb7740..435999ee31f 100644 --- a/ingestion/src/metadata/ingestion/source/database/clickhouse/usage.py +++ b/ingestion/src/metadata/ingestion/source/database/clickhouse/usage.py @@ -31,3 +31,7 @@ class ClickhouseUsageSource(ClickhouseQueryParserSource, UsageSource): filters = """ and query_kind = 'Select' """ + + database_field = "" + + schema_field = "databases" diff --git a/ingestion/src/metadata/ingestion/source/database/databricks/query_parser.py b/ingestion/src/metadata/ingestion/source/database/databricks/query_parser.py index b3193553375..6c69a5c1f01 100644 --- a/ingestion/src/metadata/ingestion/source/database/databricks/query_parser.py +++ b/ingestion/src/metadata/ingestion/source/database/databricks/query_parser.py @@ -62,3 +62,8 @@ class DatabricksQueryParserSource(QueryParserSource, ABC): f"Expected DatabricksConnection, but got {connection}" ) return cls(config, metadata_config) + + def prepare(self): + """ + By default, there's nothing to prepare + """ diff --git a/ingestion/src/metadata/ingestion/source/database/mssql/lineage.py b/ingestion/src/metadata/ingestion/source/database/mssql/lineage.py index 82e06099784..034cf17db76 100644 --- a/ingestion/src/metadata/ingestion/source/database/mssql/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/mssql/lineage.py @@ -21,3 +21,7 @@ class MssqlLineageSource(MssqlQueryParserSource, LineageSource): sql_stmt = MSSQL_SQL_STATEMENT filters = "" # No filtering in the queries + + database_field = "db.NAME" + + schema_field = "" # schema filtering not available diff --git a/ingestion/src/metadata/ingestion/source/database/mssql/usage.py b/ingestion/src/metadata/ingestion/source/database/mssql/usage.py index bcff04974c4..5443d3d6cfc 100644 --- a/ingestion/src/metadata/ingestion/source/database/mssql/usage.py +++ b/ingestion/src/metadata/ingestion/source/database/mssql/usage.py @@ -21,3 +21,7 @@ class MssqlUsageSource(MssqlQueryParserSource, UsageSource): sql_stmt = MSSQL_SQL_STATEMENT filters = "" # No filtering in the queries + + database_field = "db.NAME" + + schema_field = "" # schema filtering not available diff --git a/ingestion/src/metadata/ingestion/source/database/postgres/lineage.py b/ingestion/src/metadata/ingestion/source/database/postgres/lineage.py index 3ea860079dc..d297ecec6fe 100644 --- a/ingestion/src/metadata/ingestion/source/database/postgres/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/postgres/lineage.py @@ -36,6 +36,8 @@ class PostgresLineageSource(PostgresQueryParserSource, LineageSource): OR s.query ILIKE '%%insert%%' ) """ + database_field = "d.datname" + schema_field = "" # schema filtering not available def next_record(self) -> Iterable[AddLineageRequest]: """ diff --git a/ingestion/src/metadata/ingestion/source/database/postgres/query_parser.py b/ingestion/src/metadata/ingestion/source/database/postgres/query_parser.py index 4858134558a..b6bcc30373c 100644 --- a/ingestion/src/metadata/ingestion/source/database/postgres/query_parser.py +++ b/ingestion/src/metadata/ingestion/source/database/postgres/query_parser.py @@ -70,7 +70,7 @@ class PostgresQueryParserSource(QueryParserSource, ABC): """ return self.sql_stmt.format( result_limit=self.config.sourceConfig.config.resultLimit, - filters=self.filters, + filters=self.filters, # pylint: disable=no-member ) def get_table_query(self) -> Iterable[TableQuery]: diff --git a/ingestion/src/metadata/ingestion/source/database/postgres/usage.py b/ingestion/src/metadata/ingestion/source/database/postgres/usage.py index 52a0a8f3035..86d5b9a038f 100644 --- a/ingestion/src/metadata/ingestion/source/database/postgres/usage.py +++ b/ingestion/src/metadata/ingestion/source/database/postgres/usage.py @@ -25,3 +25,5 @@ class PostgresUsageSource(PostgresQueryParserSource, UsageSource): sql_stmt = POSTGRES_SQL_STATEMENT filters = "" + database_field = "d.datname" + schema_field = "" # schema filtering not available diff --git a/ingestion/src/metadata/ingestion/source/database/query_parser_source.py b/ingestion/src/metadata/ingestion/source/database/query_parser_source.py index ce6c6ae04d1..a6f4bb4de4c 100644 --- a/ingestion/src/metadata/ingestion/source/database/query_parser_source.py +++ b/ingestion/src/metadata/ingestion/source/database/query_parser_source.py @@ -13,9 +13,10 @@ Usage Souce Module """ from abc import ABC, abstractmethod from datetime import datetime -from typing import Iterator, Optional, Union +from typing import Iterator, List, Optional, Union from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest +from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) @@ -45,6 +46,8 @@ class QueryParserSource(Source[Union[TableQuery, AddLineageRequest]], ABC): sql_stmt: str filters: str + database_field: str + schema_field: str def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection): self.config = config @@ -58,8 +61,31 @@ class QueryParserSource(Source[Union[TableQuery, AddLineageRequest]], ABC): def prepare(self): """ - By default, there's nothing to prepare + Fetch queries only from DB that is ingested in OM """ + databases: List[Database] = self.metadata.list_all_entities( + Database, ["databaseSchemas"], params={"service": self.config.serviceName} + ) + database_name_list = [] + schema_name_list = [] + + for database in databases: + database_name_list.append(database.name.__root__) + if self.schema_field and database.databaseSchemas: + for schema in database.databaseSchemas.__root__: + schema_name_list.append(schema.name) + + if self.database_field and database_name_list: + self.filters += ( # pylint: disable=no-member + f" AND {self.database_field} IN ('" + + "','".join(database_name_list) + + "')" + ) + + if self.schema_field and schema_name_list: + self.filters += ( # pylint: disable=no-member + f" AND {self.schema_field} IN ('" + "','".join(schema_name_list) + "')" + ) @abstractmethod def get_table_query(self) -> Optional[Iterator[TableQuery]]: @@ -97,7 +123,7 @@ class QueryParserSource(Source[Union[TableQuery, AddLineageRequest]], ABC): return self.sql_stmt.format( start_time=start_time, end_time=end_time, - filters=self.filters, + filters=self.filters, # pylint: disable=no-member result_limit=self.source_config.resultLimit, ) diff --git a/ingestion/src/metadata/ingestion/source/database/redshift/lineage.py b/ingestion/src/metadata/ingestion/source/database/redshift/lineage.py index 1fcdfa76819..a8e3f355715 100644 --- a/ingestion/src/metadata/ingestion/source/database/redshift/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/redshift/lineage.py @@ -47,3 +47,9 @@ class RedshiftLineageSource(RedshiftQueryParserSource, LineageSource): """ sql_stmt = REDSHIFT_SQL_STATEMENT + + database_field = "database_name" + + schema_field = "schema_name" + + db_filters = "" diff --git a/ingestion/src/metadata/ingestion/source/database/redshift/queries.py b/ingestion/src/metadata/ingestion/source/database/redshift/queries.py index 2401b619c9b..1c40242a874 100644 --- a/ingestion/src/metadata/ingestion/source/database/redshift/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/redshift/queries.py @@ -72,6 +72,8 @@ REDSHIFT_SQL_STATEMENT = textwrap.dedent( ON s.query = fq.query INNER JOIN pg_catalog.pg_user AS u ON q.userid = u.usesysid + WHERE + {db_filters} ORDER BY q.endtime DESC """ ) diff --git a/ingestion/src/metadata/ingestion/source/database/redshift/query_parser.py b/ingestion/src/metadata/ingestion/source/database/redshift/query_parser.py index 8a70265454c..f4af743c200 100644 --- a/ingestion/src/metadata/ingestion/source/database/redshift/query_parser.py +++ b/ingestion/src/metadata/ingestion/source/database/redshift/query_parser.py @@ -13,7 +13,9 @@ Redshift usage module """ from abc import ABC from datetime import datetime +from typing import List +from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.services.connections.database.redshiftConnection import ( RedshiftConnection, ) @@ -36,6 +38,7 @@ class RedshiftQueryParserSource(QueryParserSource, ABC): """ filters: str + db_filters: str @classmethod def create(cls, config_dict, metadata_config: OpenMetadataConnection): @@ -47,6 +50,32 @@ class RedshiftQueryParserSource(QueryParserSource, ABC): ) return cls(config, metadata_config) + def prepare(self): + """ + Fetch queries only from DB that is ingested in OM + """ + databases: List[Database] = self.metadata.list_all_entities( + Database, ["databaseSchemas"], params={"service": self.config.serviceName} + ) + database_name_list = [] + schema_name_list = [] + + for database in databases: + database_name_list.append(database.name.__root__) + if self.schema_field and database.databaseSchemas: + for schema in database.databaseSchemas.__root__: + schema_name_list.append(schema.name) + + if self.database_field and database_name_list: + self.db_filters += ( # pylint: disable=no-member + f"{self.database_field} IN ('" + "','".join(database_name_list) + "')" + ) + + if self.schema_field and schema_name_list: + self.db_filters += ( # pylint: disable=no-member + f" AND {self.schema_field} IN ('" + "','".join(schema_name_list) + "')" + ) + def get_sql_statement(self, start_time: datetime, end_time: datetime) -> str: """ returns sql statement to fetch query logs @@ -54,6 +83,7 @@ class RedshiftQueryParserSource(QueryParserSource, ABC): return self.sql_stmt.format( start_time=start_time, end_time=end_time, - filters=self.filters, + filters=self.filters, # pylint: disable=no-member + db_filters=self.db_filters, # pylint: disable=no-member result_limit=self.source_config.resultLimit, ) diff --git a/ingestion/src/metadata/ingestion/source/database/redshift/usage.py b/ingestion/src/metadata/ingestion/source/database/redshift/usage.py index 77f33c13d29..97a5954f83b 100644 --- a/ingestion/src/metadata/ingestion/source/database/redshift/usage.py +++ b/ingestion/src/metadata/ingestion/source/database/redshift/usage.py @@ -29,3 +29,9 @@ class RedshiftUsageSource(RedshiftQueryParserSource, UsageSource): """ sql_stmt = REDSHIFT_SQL_STATEMENT + + database_field = "database_name" + + schema_field = "schema_name" + + db_filters = "" diff --git a/ingestion/src/metadata/ingestion/source/database/sample_usage.py b/ingestion/src/metadata/ingestion/source/database/sample_usage.py index e41d69be971..a50b83fcf68 100644 --- a/ingestion/src/metadata/ingestion/source/database/sample_usage.py +++ b/ingestion/src/metadata/ingestion/source/database/sample_usage.py @@ -45,6 +45,10 @@ class SampleUsageSource(UsageSource): service_type = DatabaseServiceType.BigQuery.value + database_field = "" # filtering not required + + schema_field = "" # filtering not required + def __init__( self, config: WorkflowSource, metadata_config: OpenMetadataConnection ): # pylint: disable=super-init-not-called diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/lineage.py b/ingestion/src/metadata/ingestion/source/database/snowflake/lineage.py index 11c2c5f1cb6..c81248014fa 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/lineage.py @@ -29,3 +29,7 @@ class SnowflakeLineageSource(SnowflakeQueryParserSource, LineageSource): filters = """ AND QUERY_TYPE IN ('INSERT', 'MERGE', 'UPDATE','CREATE_TABLE_AS_SELECT') """ + + database_field = "database_name" + + schema_field = "schema_name" diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/query_parser.py b/ingestion/src/metadata/ingestion/source/database/snowflake/query_parser.py index 4e0a59b2ac0..9343e3ff5a0 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/query_parser.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/query_parser.py @@ -67,7 +67,7 @@ class SnowflakeQueryParserSource(QueryParserSource, ABC): start_time=start_time, end_time=end_time, result_limit=self.config.sourceConfig.config.resultLimit, - filters=self.filters, + filters=self.filters, # pylint: disable=no-member ) def set_session_query_tag(self) -> None: diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/usage.py b/ingestion/src/metadata/ingestion/source/database/snowflake/usage.py index cb60f4f8f7c..46fc35f643a 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/usage.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/usage.py @@ -32,3 +32,7 @@ class SnowflakeUsageSource(SnowflakeQueryParserSource, UsageSource): ALTER_SESSION','COPY','UPDATE','COMMIT','SHOW','ALTER','DESCRIBE','CREATE_TABLE','PUT_FILES','GET_FILES', 'INSERT', 'MERGE', 'CREATE_TABLE_AS_SELECT') """ + + database_field = "database_name" + + schema_field = "schema_name" diff --git a/ingestion/src/metadata/ingestion/source/database/sql_column_handler.py b/ingestion/src/metadata/ingestion/source/database/sql_column_handler.py index ad66e6fadd4..2169f3f734a 100644 --- a/ingestion/src/metadata/ingestion/source/database/sql_column_handler.py +++ b/ingestion/src/metadata/ingestion/source/database/sql_column_handler.py @@ -274,6 +274,7 @@ class SqlColumnHandlerMixin: return col_raw_type.length if col_raw_type.length else 1 except AttributeError: return 1 + return 1 return None @staticmethod diff --git a/ingestion/tests/unit/test_usage_filter.py b/ingestion/tests/unit/test_usage_filter.py new file mode 100644 index 00000000000..f5e45591a89 --- /dev/null +++ b/ingestion/tests/unit/test_usage_filter.py @@ -0,0 +1,241 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=line-too-long,unused-argument + +""" +Usage query database and schema filter tests +""" + +from typing import Dict, List, Optional, Type, TypeVar +from unittest import TestCase +from unittest.mock import patch + +from pydantic import BaseModel + +from metadata.generated.schema.entity.data.database import Database +from metadata.generated.schema.metadataIngestion.workflow import ( + OpenMetadataWorkflowConfig, +) +from metadata.generated.schema.type.entityReference import ( + EntityReference, + EntityReferenceList, +) +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source.database.clickhouse.usage import ClickhouseUsageSource +from metadata.ingestion.source.database.mssql.usage import MssqlUsageSource +from metadata.ingestion.source.database.redshift.usage import RedshiftUsageSource + +T = TypeVar("T", bound=BaseModel) + + +mock_mssql_config = { + "source": { + "type": "mssql-usage", + "serviceName": "local_mssql", + "serviceConnection": { + "config": { + "type": "Mssql", + "username": "usernames", + "password": "password", + "hostPort": "localhost:1433", + } + }, + "sourceConfig": {"config": {"queryLogDuration": "1"}}, + }, + "stage": {"type": "table-usage", "config": {"filename": "/tmp/mssql_usage"}}, + "bulkSink": {"type": "metadata-usage", "config": {"filename": "/tmp/mssql_usage"}}, + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "openmetadata", + "securityConfig": { + "jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" + }, + } + }, +} + + +mock_redshift_config = { + "source": { + "type": "redshift-usage", + "serviceName": "local_redshift", + "serviceConnection": { + "config": { + "hostPort": "localhost:5439", + "username": "usernames", + "password": "password", + "type": "Redshift", + "database": "dev", + } + }, + "sourceConfig": {"config": {"queryLogDuration": "1"}}, + }, + "stage": {"type": "table-usage", "config": {"filename": "/tmp/mssql_usage"}}, + "bulkSink": {"type": "metadata-usage", "config": {"filename": "/tmp/mssql_usage"}}, + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "openmetadata", + "securityConfig": { + "jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" + }, + } + }, +} + +mock_clickhouse_config = { + "source": { + "type": "clickhouse-usage", + "serviceName": "local_clickhouse", + "serviceConnection": { + "config": { + "hostPort": "localhost:8123", + "username": "usernames", + "password": "password", + "type": "Clickhouse", + "databaseSchema": "default", + } + }, + "sourceConfig": {"config": {"queryLogDuration": "1"}}, + }, + "stage": {"type": "table-usage", "config": {"filename": "/tmp/mssql_usage"}}, + "bulkSink": {"type": "metadata-usage", "config": {"filename": "/tmp/mssql_usage"}}, + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "openmetadata", + "securityConfig": { + "jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" + }, + } + }, +} + + +def mock_list_entities( + self, + entity: Type[T], + fields: Optional[List[str]] = None, + limit: int = 1000, + params: Optional[Dict[str, str]] = None, +): + """ + mock list entities for databases + """ + schema_list1 = EntityReferenceList( + __root__=[ + EntityReference( + id="73129df3-96ed-476d-a9b5-b92091264649", + name="test_schema_1", + type="databaseSchema", + ), + EntityReference( + id="73129df3-96ed-476d-a9b5-b92091264649", + name="test_schema_2", + type="databaseSchema", + ), + EntityReference( + id="73129df3-96ed-476d-a9b5-b92091264649", + name="test_schema_3", + type="databaseSchema", + ), + ] + ) + + schema_list2 = EntityReferenceList( + __root__=[ + EntityReference( + id="73129df3-96ed-476d-a9b5-b92091264649", + name="test_schema_4", + type="databaseSchema", + ), + EntityReference( + id="73129df3-96ed-476d-a9b5-b92091264649", + name="test_schema_5", + type="databaseSchema", + ), + EntityReference( + id="73129df3-96ed-476d-a9b5-b92091264649", + name="test_schema_6", + type="databaseSchema", + ), + ] + ) + + return [ + Database( + id="6a9b1ade-dff1-4a14-89f9-7842f1e06270", + name="test_db_1", + service=EntityReference( + id="73129df3-96ed-476d-a9b5-b92091264649", + name="test_db_service", + type="databaseService", + ), + databaseSchemas=schema_list1, + ), + Database( + id="6a9b1ade-dff1-4a14-89f9-7842f1e06270", + name="test_db_2", + service=EntityReference( + id="73129df3-96ed-476d-a9b5-b92091264649", + name="test_db_service", + type="databaseService", + ), + databaseSchemas=schema_list2, + ), + ] + + +EXPECTED_MSSQL_FILTER = " AND db.NAME IN ('test_db_1','test_db_2')" + +EXPECTED_REDSHIFT_FILTER = """database_name IN ('test_db_1','test_db_2') AND schema_name IN ('test_schema_1','test_schema_2','test_schema_3','test_schema_4','test_schema_5','test_schema_6')""" + + +EXPECTED_CLICKHOUSE_FILTER = """ + and query_kind = 'Select' + AND hasAny(databases, ['test_schema_1','test_schema_2','test_schema_3','test_schema_4','test_schema_5','test_schema_6'])""" + + +class UsageQueryFilterTests(TestCase): + """ + Usage filter tests for database and schema filters + """ + + @patch.object(OpenMetadata, "list_all_entities", mock_list_entities) + def test_prepare_mssql(self): + config = OpenMetadataWorkflowConfig.parse_obj(mock_mssql_config) + mssql_source = MssqlUsageSource.create( + mock_mssql_config["source"], + config.workflowConfig.openMetadataServerConfig, + ) + mssql_source.prepare() + assert mssql_source.filters == EXPECTED_MSSQL_FILTER + + @patch.object(OpenMetadata, "list_all_entities", mock_list_entities) + def test_prepare_redshift(self): + config = OpenMetadataWorkflowConfig.parse_obj(mock_redshift_config) + redshift_source = RedshiftUsageSource.create( + mock_redshift_config["source"], + config.workflowConfig.openMetadataServerConfig, + ) + redshift_source.prepare() + assert redshift_source.db_filters == EXPECTED_REDSHIFT_FILTER + + @patch.object(OpenMetadata, "list_all_entities", mock_list_entities) + def test_prepare_clickhouse(self): + config = OpenMetadataWorkflowConfig.parse_obj(mock_clickhouse_config) + clickhouse_source = ClickhouseUsageSource.create( + mock_clickhouse_config["source"], + config.workflowConfig.openMetadataServerConfig, + ) + clickhouse_source.prepare() + assert clickhouse_source.filters == EXPECTED_CLICKHOUSE_FILTER diff --git a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceQueryUsagePipeline.json b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceQueryUsagePipeline.json index 5a2e2b00bba..458d59b2849 100644 --- a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceQueryUsagePipeline.json +++ b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceQueryUsagePipeline.json @@ -35,18 +35,6 @@ "queryLogFilePath": { "description": "Configuration to set the file path for query logs", "type": "string" - }, - "schemaFilterPattern": { - "description": "Regex to only fetch tables or databases that matches the pattern.", - "$ref": "../type/filterPattern.json#/definitions/filterPattern" - }, - "tableFilterPattern": { - "description": "Regex exclude tables or databases that matches the pattern.", - "$ref": "../type/filterPattern.json#/definitions/filterPattern" - }, - "databaseFilterPattern": { - "description": "Regex to only fetch databases that matches the pattern.", - "$ref": "../type/filterPattern.json#/definitions/filterPattern" } }, "additionalProperties": false