diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery/lineage.py b/ingestion/src/metadata/ingestion/source/database/bigquery/lineage.py index bdd2c345576..51fd0eff82e 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery/lineage.py @@ -29,7 +29,3 @@ class BigqueryLineageSource(BigqueryQueryParserSource, LineageSource): filters = """ AND statement_type IN ("INSERT", "MERGE", "CREATE_TABLE_AS_SELECT", "UPDATE") """ - - database_field = "project_id" - - schema_field = "" # schema filtering not available diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery/query_parser.py b/ingestion/src/metadata/ingestion/source/database/bigquery/query_parser.py index 2ff6c84ee1e..eb883ea1854 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery/query_parser.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery/query_parser.py @@ -58,7 +58,7 @@ class BigqueryQueryParserSource(QueryParserSource, ABC): start_time=start_time, end_time=end_time, region=self.service_connection.usageLocation, - filters=self.filters, # pylint: disable=no-member + filters=self.filters, result_limit=self.source_config.resultLimit, ) diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery/usage.py b/ingestion/src/metadata/ingestion/source/database/bigquery/usage.py index 2da25210bb2..7729f7a1b7b 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery/usage.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery/usage.py @@ -29,7 +29,3 @@ class BigqueryUsageSource(BigqueryQueryParserSource, UsageSource): filters = """ AND statement_type = "SELECT" """ - - database_field = "project_id" - - schema_field = "" # schema filtering not available diff --git a/ingestion/src/metadata/ingestion/source/database/mssql/lineage.py b/ingestion/src/metadata/ingestion/source/database/mssql/lineage.py index 034cf17db76..82e06099784 100644 --- a/ingestion/src/metadata/ingestion/source/database/mssql/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/mssql/lineage.py @@ -21,7 +21,3 @@ class MssqlLineageSource(MssqlQueryParserSource, LineageSource): sql_stmt = MSSQL_SQL_STATEMENT filters = "" # No filtering in the queries - - database_field = "db.NAME" - - schema_field = "" # schema filtering not available diff --git a/ingestion/src/metadata/ingestion/source/database/mssql/usage.py b/ingestion/src/metadata/ingestion/source/database/mssql/usage.py index 5443d3d6cfc..bcff04974c4 100644 --- a/ingestion/src/metadata/ingestion/source/database/mssql/usage.py +++ b/ingestion/src/metadata/ingestion/source/database/mssql/usage.py @@ -21,7 +21,3 @@ class MssqlUsageSource(MssqlQueryParserSource, UsageSource): sql_stmt = MSSQL_SQL_STATEMENT filters = "" # No filtering in the queries - - database_field = "db.NAME" - - schema_field = "" # schema filtering not available diff --git a/ingestion/src/metadata/ingestion/source/database/postgres/lineage.py b/ingestion/src/metadata/ingestion/source/database/postgres/lineage.py index 3c027836fb6..afc36491f4e 100644 --- a/ingestion/src/metadata/ingestion/source/database/postgres/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/postgres/lineage.py @@ -37,8 +37,6 @@ class PostgresLineageSource(PostgresQueryParserSource, LineageSource): OR s.query ILIKE '%%insert%%' ) """ - database_field = "d.datname" - schema_field = "" # schema filtering not available def next_record(self) -> Iterable[AddLineageRequest]: """ diff --git a/ingestion/src/metadata/ingestion/source/database/postgres/query_parser.py b/ingestion/src/metadata/ingestion/source/database/postgres/query_parser.py index b6bcc30373c..4858134558a 100644 --- a/ingestion/src/metadata/ingestion/source/database/postgres/query_parser.py +++ b/ingestion/src/metadata/ingestion/source/database/postgres/query_parser.py @@ -70,7 +70,7 @@ class PostgresQueryParserSource(QueryParserSource, ABC): """ return self.sql_stmt.format( result_limit=self.config.sourceConfig.config.resultLimit, - filters=self.filters, # pylint: disable=no-member + filters=self.filters, ) def get_table_query(self) -> Iterable[TableQuery]: diff --git a/ingestion/src/metadata/ingestion/source/database/postgres/usage.py b/ingestion/src/metadata/ingestion/source/database/postgres/usage.py index 86d5b9a038f..52a0a8f3035 100644 --- a/ingestion/src/metadata/ingestion/source/database/postgres/usage.py +++ b/ingestion/src/metadata/ingestion/source/database/postgres/usage.py @@ -25,5 +25,3 @@ class PostgresUsageSource(PostgresQueryParserSource, UsageSource): sql_stmt = POSTGRES_SQL_STATEMENT filters = "" - database_field = "d.datname" - schema_field = "" # schema filtering not available diff --git a/ingestion/src/metadata/ingestion/source/database/query_parser_source.py b/ingestion/src/metadata/ingestion/source/database/query_parser_source.py index a6f4bb4de4c..3086520e89d 100644 --- a/ingestion/src/metadata/ingestion/source/database/query_parser_source.py +++ b/ingestion/src/metadata/ingestion/source/database/query_parser_source.py @@ -13,10 +13,9 @@ Usage Souce Module """ from abc import ABC, abstractmethod from datetime import datetime -from typing import Iterator, List, Optional, Union +from typing import Iterator, Optional, Union from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest -from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) @@ -61,31 +60,8 @@ class QueryParserSource(Source[Union[TableQuery, AddLineageRequest]], ABC): def prepare(self): """ - Fetch queries only from DB that is ingested in OM + By default, there's nothing to prepare """ - databases: List[Database] = self.metadata.list_all_entities( - Database, ["databaseSchemas"], params={"service": self.config.serviceName} - ) - database_name_list = [] - schema_name_list = [] - - for database in databases: - database_name_list.append(database.name.__root__) - if self.schema_field and database.databaseSchemas: - for schema in database.databaseSchemas.__root__: - schema_name_list.append(schema.name) - - if self.database_field and database_name_list: - self.filters += ( # pylint: disable=no-member - f" AND {self.database_field} IN ('" - + "','".join(database_name_list) - + "')" - ) - - if self.schema_field and schema_name_list: - self.filters += ( # pylint: disable=no-member - f" AND {self.schema_field} IN ('" + "','".join(schema_name_list) + "')" - ) @abstractmethod def get_table_query(self) -> Optional[Iterator[TableQuery]]: @@ -123,7 +99,7 @@ class QueryParserSource(Source[Union[TableQuery, AddLineageRequest]], ABC): return self.sql_stmt.format( start_time=start_time, end_time=end_time, - filters=self.filters, # pylint: disable=no-member + filters=self.filters, result_limit=self.source_config.resultLimit, ) diff --git a/ingestion/src/metadata/ingestion/source/database/redshift/lineage.py b/ingestion/src/metadata/ingestion/source/database/redshift/lineage.py index a8e3f355715..1fcdfa76819 100644 --- a/ingestion/src/metadata/ingestion/source/database/redshift/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/redshift/lineage.py @@ -47,9 +47,3 @@ class RedshiftLineageSource(RedshiftQueryParserSource, LineageSource): """ sql_stmt = REDSHIFT_SQL_STATEMENT - - database_field = "database_name" - - schema_field = "schema_name" - - db_filters = "" diff --git a/ingestion/src/metadata/ingestion/source/database/redshift/queries.py b/ingestion/src/metadata/ingestion/source/database/redshift/queries.py index dfd9fbc4a5a..362e0170a2d 100644 --- a/ingestion/src/metadata/ingestion/source/database/redshift/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/redshift/queries.py @@ -79,8 +79,6 @@ REDSHIFT_SQL_STATEMENT = textwrap.dedent( ON s.query = fq.query INNER JOIN pg_catalog.pg_user AS u ON q.userid = u.usesysid - WHERE - {db_filters} ORDER BY q.endtime DESC """ ) diff --git a/ingestion/src/metadata/ingestion/source/database/redshift/query_parser.py b/ingestion/src/metadata/ingestion/source/database/redshift/query_parser.py index f4af743c200..8a70265454c 100644 --- a/ingestion/src/metadata/ingestion/source/database/redshift/query_parser.py +++ b/ingestion/src/metadata/ingestion/source/database/redshift/query_parser.py @@ -13,9 +13,7 @@ Redshift usage module """ from abc import ABC from datetime import datetime -from typing import List -from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.services.connections.database.redshiftConnection import ( RedshiftConnection, ) @@ -38,7 +36,6 @@ class RedshiftQueryParserSource(QueryParserSource, ABC): """ filters: str - db_filters: str @classmethod def create(cls, config_dict, metadata_config: OpenMetadataConnection): @@ -50,32 +47,6 @@ class RedshiftQueryParserSource(QueryParserSource, ABC): ) return cls(config, metadata_config) - def prepare(self): - """ - Fetch queries only from DB that is ingested in OM - """ - databases: List[Database] = self.metadata.list_all_entities( - Database, ["databaseSchemas"], params={"service": self.config.serviceName} - ) - database_name_list = [] - schema_name_list = [] - - for database in databases: - database_name_list.append(database.name.__root__) - if self.schema_field and database.databaseSchemas: - for schema in database.databaseSchemas.__root__: - schema_name_list.append(schema.name) - - if self.database_field and database_name_list: - self.db_filters += ( # pylint: disable=no-member - f"{self.database_field} IN ('" + "','".join(database_name_list) + "')" - ) - - if self.schema_field and schema_name_list: - self.db_filters += ( # pylint: disable=no-member - f" AND {self.schema_field} IN ('" + "','".join(schema_name_list) + "')" - ) - def get_sql_statement(self, start_time: datetime, end_time: datetime) -> str: """ returns sql statement to fetch query logs @@ -83,7 +54,6 @@ class RedshiftQueryParserSource(QueryParserSource, ABC): return self.sql_stmt.format( start_time=start_time, end_time=end_time, - filters=self.filters, # pylint: disable=no-member - db_filters=self.db_filters, # pylint: disable=no-member + filters=self.filters, result_limit=self.source_config.resultLimit, ) diff --git a/ingestion/src/metadata/ingestion/source/database/redshift/usage.py b/ingestion/src/metadata/ingestion/source/database/redshift/usage.py index 97a5954f83b..77f33c13d29 100644 --- a/ingestion/src/metadata/ingestion/source/database/redshift/usage.py +++ b/ingestion/src/metadata/ingestion/source/database/redshift/usage.py @@ -29,9 +29,3 @@ class RedshiftUsageSource(RedshiftQueryParserSource, UsageSource): """ sql_stmt = REDSHIFT_SQL_STATEMENT - - database_field = "database_name" - - schema_field = "schema_name" - - db_filters = "" diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/lineage.py b/ingestion/src/metadata/ingestion/source/database/snowflake/lineage.py index c81248014fa..11c2c5f1cb6 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/lineage.py @@ -29,7 +29,3 @@ class SnowflakeLineageSource(SnowflakeQueryParserSource, LineageSource): filters = """ AND QUERY_TYPE IN ('INSERT', 'MERGE', 'UPDATE','CREATE_TABLE_AS_SELECT') """ - - database_field = "database_name" - - schema_field = "schema_name" diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/query_parser.py b/ingestion/src/metadata/ingestion/source/database/snowflake/query_parser.py index 9343e3ff5a0..4e0a59b2ac0 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/query_parser.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/query_parser.py @@ -67,7 +67,7 @@ class SnowflakeQueryParserSource(QueryParserSource, ABC): start_time=start_time, end_time=end_time, result_limit=self.config.sourceConfig.config.resultLimit, - filters=self.filters, # pylint: disable=no-member + filters=self.filters, ) def set_session_query_tag(self) -> None: diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/usage.py b/ingestion/src/metadata/ingestion/source/database/snowflake/usage.py index 46fc35f643a..cb60f4f8f7c 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/usage.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/usage.py @@ -32,7 +32,3 @@ class SnowflakeUsageSource(SnowflakeQueryParserSource, UsageSource): ALTER_SESSION','COPY','UPDATE','COMMIT','SHOW','ALTER','DESCRIBE','CREATE_TABLE','PUT_FILES','GET_FILES', 'INSERT', 'MERGE', 'CREATE_TABLE_AS_SELECT') """ - - database_field = "database_name" - - schema_field = "schema_name" diff --git a/ingestion/tests/unit/test_usage_filter.py b/ingestion/tests/unit/test_usage_filter.py index f5e45591a89..0aa4f1ed61f 100644 --- a/ingestion/tests/unit/test_usage_filter.py +++ b/ingestion/tests/unit/test_usage_filter.py @@ -30,68 +30,9 @@ from metadata.generated.schema.type.entityReference import ( ) from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.database.clickhouse.usage import ClickhouseUsageSource -from metadata.ingestion.source.database.mssql.usage import MssqlUsageSource -from metadata.ingestion.source.database.redshift.usage import RedshiftUsageSource T = TypeVar("T", bound=BaseModel) - -mock_mssql_config = { - "source": { - "type": "mssql-usage", - "serviceName": "local_mssql", - "serviceConnection": { - "config": { - "type": "Mssql", - "username": "usernames", - "password": "password", - "hostPort": "localhost:1433", - } - }, - "sourceConfig": {"config": {"queryLogDuration": "1"}}, - }, - "stage": {"type": "table-usage", "config": {"filename": "/tmp/mssql_usage"}}, - "bulkSink": {"type": "metadata-usage", "config": {"filename": "/tmp/mssql_usage"}}, - "workflowConfig": { - "openMetadataServerConfig": { - "hostPort": "http://localhost:8585/api", - "authProvider": "openmetadata", - "securityConfig": { - "jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" - }, - } - }, -} - - -mock_redshift_config = { - "source": { - "type": "redshift-usage", - "serviceName": "local_redshift", - "serviceConnection": { - "config": { - "hostPort": "localhost:5439", - "username": "usernames", - "password": "password", - "type": "Redshift", - "database": "dev", - } - }, - "sourceConfig": {"config": {"queryLogDuration": "1"}}, - }, - "stage": {"type": "table-usage", "config": {"filename": "/tmp/mssql_usage"}}, - "bulkSink": {"type": "metadata-usage", "config": {"filename": "/tmp/mssql_usage"}}, - "workflowConfig": { - "openMetadataServerConfig": { - "hostPort": "http://localhost:8585/api", - "authProvider": "openmetadata", - "securityConfig": { - "jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" - }, - } - }, -} - mock_clickhouse_config = { "source": { "type": "clickhouse-usage", @@ -195,11 +136,6 @@ def mock_list_entities( ] -EXPECTED_MSSQL_FILTER = " AND db.NAME IN ('test_db_1','test_db_2')" - -EXPECTED_REDSHIFT_FILTER = """database_name IN ('test_db_1','test_db_2') AND schema_name IN ('test_schema_1','test_schema_2','test_schema_3','test_schema_4','test_schema_5','test_schema_6')""" - - EXPECTED_CLICKHOUSE_FILTER = """ and query_kind = 'Select' AND hasAny(databases, ['test_schema_1','test_schema_2','test_schema_3','test_schema_4','test_schema_5','test_schema_6'])""" @@ -210,26 +146,6 @@ class UsageQueryFilterTests(TestCase): Usage filter tests for database and schema filters """ - @patch.object(OpenMetadata, "list_all_entities", mock_list_entities) - def test_prepare_mssql(self): - config = OpenMetadataWorkflowConfig.parse_obj(mock_mssql_config) - mssql_source = MssqlUsageSource.create( - mock_mssql_config["source"], - config.workflowConfig.openMetadataServerConfig, - ) - mssql_source.prepare() - assert mssql_source.filters == EXPECTED_MSSQL_FILTER - - @patch.object(OpenMetadata, "list_all_entities", mock_list_entities) - def test_prepare_redshift(self): - config = OpenMetadataWorkflowConfig.parse_obj(mock_redshift_config) - redshift_source = RedshiftUsageSource.create( - mock_redshift_config["source"], - config.workflowConfig.openMetadataServerConfig, - ) - redshift_source.prepare() - assert redshift_source.db_filters == EXPECTED_REDSHIFT_FILTER - @patch.object(OpenMetadata, "list_all_entities", mock_list_entities) def test_prepare_clickhouse(self): config = OpenMetadataWorkflowConfig.parse_obj(mock_clickhouse_config)