From 831fce5b7e215effda22ea562f875d11cad6f9c6 Mon Sep 17 00:00:00 2001 From: Ayush Shah Date: Thu, 18 Jan 2024 18:52:43 +0530 Subject: [PATCH] Fixes 10709: Add useFqnForFiltering to profiler workflow (#14717) --- .../src/metadata/profiler/source/metadata.py | 36 +++++++++-- .../metadata/profiler/source/metadata_ext.py | 59 ++++--------------- .../tests/unit/profiler/test_workflow.py | 46 ++++++++++++++- .../databaseServiceProfilerPipeline.json | 7 ++- 4 files changed, 93 insertions(+), 55 deletions(-) diff --git a/ingestion/src/metadata/profiler/source/metadata.py b/ingestion/src/metadata/profiler/source/metadata.py index 29ec8a0e679..8d9b9388a98 100644 --- a/ingestion/src/metadata/profiler/source/metadata.py +++ b/ingestion/src/metadata/profiler/source/metadata.py @@ -17,6 +17,7 @@ from typing import Iterable, Optional, cast from pydantic import BaseModel from metadata.generated.schema.entity.data.database import Database +from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.table import Table, TableType from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.entity.services.ingestionPipelines.status import ( @@ -117,7 +118,6 @@ class OpenMetadataSource(Source): self.metadata.health_check() def _iter(self, *_, **__) -> Iterable[Either[ProfilerSourceAndEntity]]: - for database in self.get_database_entities(): try: profiler_source = profiler_source_factory.create( @@ -149,9 +149,17 @@ class OpenMetadataSource(Source): def filter_databases(self, database: Database) -> Optional[Database]: """Returns filtered database entities""" + database_fqn = fqn.build( + self.metadata, + entity_type=Database, + service_name=self.config.source.serviceName, + database_name=database.name.__root__, + ) if filter_by_database( self.source_config.databaseFilterPattern, - database.name.__root__, + database_fqn + if self.source_config.useFqnForFiltering + else database.name.__root__, ): self.status.filter(database.name.__root__, "Database pattern not allowed") return None @@ -166,18 +174,38 @@ class OpenMetadataSource(Source): """ for table in tables: try: + schema_fqn = fqn.build( + self.metadata, + entity_type=DatabaseSchema, + service_name=self.config.source.serviceName, + database_name=table.database.name, + schema_name=table.databaseSchema.name, + ) if filter_by_schema( self.source_config.schemaFilterPattern, - table.databaseSchema.name, # type: ignore + schema_fqn + if self.source_config.useFqnForFiltering + else table.databaseSchema.name, # type: ignore ): self.status.filter( f"Schema pattern not allowed: {table.fullyQualifiedName.__root__}", "Schema pattern not allowed", ) continue + table_fqn = fqn.build( + self.metadata, + entity_type=Table, + service_name=self.config.source.serviceName, + database_name=table.database.name, + schema_name=table.databaseSchema.name, + table_name=table.name.__root__, + ) + if filter_by_table( self.source_config.tableFilterPattern, - table.name.__root__, + table_fqn + if self.source_config.useFqnForFiltering + else table.name.__root__, ): self.status.filter( f"Table pattern not allowed: {table.fullyQualifiedName.__root__}", diff --git a/ingestion/src/metadata/profiler/source/metadata_ext.py b/ingestion/src/metadata/profiler/source/metadata_ext.py index 55612aef61e..aac06f880e8 100644 --- a/ingestion/src/metadata/profiler/source/metadata_ext.py +++ b/ingestion/src/metadata/profiler/source/metadata_ext.py @@ -26,7 +26,7 @@ from typing import Iterable, cast from sqlalchemy.inspection import inspect from metadata.generated.schema.entity.data.database import Database -from metadata.generated.schema.entity.data.table import Table, TableType +from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.ingestionPipelines.status import ( StackTraceError, ) @@ -197,8 +197,17 @@ class OpenMetadataSourceExt(OpenMetadataSource): else: database_names = self.source.get_database_names_raw() for database in database_names: + database_fqn = fqn.build( + self.metadata, + entity_type=Database, + service_name=self.config.source.serviceName, + database_name=database, + ) if filter_by_database( - self.source_config.databaseFilterPattern, database + self.source_config.databaseFilterPattern, + database_fqn + if self.source_config.useFqnForFiltering + else database, ): self.status.filter(database, "Database pattern not allowed") continue @@ -216,52 +225,6 @@ class OpenMetadataSourceExt(OpenMetadataSource): logger.debug(f"Failed to fetch database names {exc}") logger.debug(traceback.format_exc()) - def filter_entities(self, tables: Iterable[Table]) -> Iterable[Table]: - """ - From a list of tables, apply the SQLSourceConfig - filter patterns. - - We will update the status on the SQLSource Status. - """ - for table in tables: - try: - if filter_by_schema( - self.source_config.schemaFilterPattern, - table.databaseSchema.name, # type: ignore - ): - self.status.filter( - f"Schema pattern not allowed: {table.fullyQualifiedName.__root__}", - "Schema pattern not allowed", - ) - continue - if filter_by_table( - self.source_config.tableFilterPattern, - table.name.__root__, - ): - self.status.filter( - f"Table pattern not allowed: {table.fullyQualifiedName.__root__}", - "Table pattern not allowed", - ) - continue - if ( - table.tableType == TableType.View - and not self.source_config.includeViews - ): - self.status.filter( - table.fullyQualifiedName.__root__, - "View filtered out", - ) - continue - yield table - except Exception as exc: - self.status.failed( - StackTraceError( - name=table.fullyQualifiedName.__root__, - error=f"Unexpected error filtering entities for table [{table}]: {exc}", - stackTrace=traceback.format_exc(), - ) - ) - def get_table_entities(self, database): """ List and filter OpenMetadata tables based on the diff --git a/ingestion/tests/unit/profiler/test_workflow.py b/ingestion/tests/unit/profiler/test_workflow.py index f03cd07d93a..53fe4d5d7ce 100644 --- a/ingestion/tests/unit/profiler/test_workflow.py +++ b/ingestion/tests/unit/profiler/test_workflow.py @@ -49,7 +49,7 @@ from metadata.workflow.profiler import ProfilerWorkflow TABLE = Table( id=uuid.uuid4(), name="users", - fullyQualifiedName="service.db.users", + fullyQualifiedName="my_service.db.users", columns=[ Column(name="id", dataType=DataType.INT), Column(name="name", dataType=DataType.STRING), @@ -141,10 +141,10 @@ def test_filter_entities(mocked_method): We can properly filter entities depending on the workflow configuration """ + service_name = "my_service" workflow = ProfilerWorkflow.create(config) mocked_method.assert_called() - service_name = "service" schema_reference1 = EntityReference( id=uuid.uuid4(), name="one_schema", type="databaseSchema" ) @@ -159,6 +159,7 @@ def test_filter_entities(mocked_method): databaseSchema=schema_reference1, fullyQualifiedName=f"{service_name}.db.{schema_reference1.name}.table1", columns=[Column(name="id", dataType=DataType.BIGINT)], + database=EntityReference(id=uuid.uuid4(), name="db", type="database"), ), Table( id=uuid.uuid4(), @@ -166,6 +167,7 @@ def test_filter_entities(mocked_method): databaseSchema=schema_reference1, fullyQualifiedName=f"{service_name}.db.{schema_reference1.name}.table2", columns=[Column(name="id", dataType=DataType.BIGINT)], + database=EntityReference(id=uuid.uuid4(), name="db", type="database"), ), Table( id=uuid.uuid4(), @@ -173,12 +175,52 @@ def test_filter_entities(mocked_method): databaseSchema=schema_reference2, fullyQualifiedName=f"{service_name}.db.{schema_reference2.name}.table3", columns=[Column(name="id", dataType=DataType.BIGINT)], + database=EntityReference(id=uuid.uuid4(), name="db", type="database"), ), ] # Simple workflow does not filter assert len(list(workflow.source.filter_entities(all_tables))) == 3 + fqn_filter_config = deepcopy(config) + fqn_filter_config["source"]["sourceConfig"]["config"]["useFqnForFiltering"] = True + fqn_filter_config["source"]["sourceConfig"]["config"]["schemaFilterPattern"] = { + "excludes": ["my_service.db.another_schema"] + } + fqn_filter_workflow = ProfilerWorkflow.create(fqn_filter_config) + mocked_method.assert_called() + + assert len(list(fqn_filter_workflow.source.filter_entities(all_tables))) == 2 + + fqn_filter_config_2 = deepcopy(config) + fqn_filter_config_2["source"]["sourceConfig"]["config"]["useFqnForFiltering"] = True + fqn_filter_config_2["source"]["sourceConfig"]["config"]["schemaFilterPattern"] = { + "includes": ["my_service.db.one_schema"] + } + fqn_filter_workflow_2 = ProfilerWorkflow.create(fqn_filter_config_2) + mocked_method.assert_called() + + assert len(list(fqn_filter_workflow_2.source.filter_entities(all_tables))) == 2 + + fqn_filter_config_3 = deepcopy(config) + fqn_filter_config_3["source"]["sourceConfig"]["config"]["useFqnForFiltering"] = True + fqn_filter_config_3["source"]["sourceConfig"]["config"]["tableFilterPattern"] = { + "includes": ["my_service.db.one_schema.table1"] + } + fqn_filter_workflow_3 = ProfilerWorkflow.create(fqn_filter_config_3) + mocked_method.assert_called() + + assert len(list(fqn_filter_workflow_3.source.filter_entities(all_tables))) == 1 + + fqn_filter_config_4 = deepcopy(config) + fqn_filter_config_4["source"]["sourceConfig"]["config"]["useFqnForFiltering"] = True + fqn_filter_config_4["source"]["sourceConfig"]["config"]["tableFilterPattern"] = { + "excludes": ["my_service.db.one_schema.table1"] + } + fqn_filter_workflow_4 = ProfilerWorkflow.create(fqn_filter_config_4) + mocked_method.assert_called() + + assert len(list(fqn_filter_workflow_4.source.filter_entities(all_tables))) == 2 # We can exclude based on the schema name exclude_config = deepcopy(config) exclude_config["source"]["sourceConfig"]["config"]["schemaFilterPattern"] = { diff --git a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceProfilerPipeline.json b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceProfilerPipeline.json index 826eb6dfd86..d5951393c00 100644 --- a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceProfilerPipeline.json +++ b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceProfilerPipeline.json @@ -79,12 +79,17 @@ "default": 5, "title": "Thread Count" }, - "timeoutSeconds": { "description": "Profiler Timeout in Seconds", "type": "integer", "default": 43200, "title": "Timeout (in sec.)" + }, + "useFqnForFiltering": { + "description": "Regex will be applied on fully qualified name (e.g service_name.db_name.schema_name.table_name) instead of raw name (e.g. table_name)", + "type": "boolean", + "default": false, + "title": "Use FQN For Filtering" } }, "additionalProperties": false