Fixes 10709: Add useFqnForFiltering to profiler workflow (#14717)

This commit is contained in:
Ayush Shah 2024-01-18 18:52:43 +05:30 committed by GitHub
parent e7665a83e5
commit 831fce5b7e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 93 additions and 55 deletions

View File

@ -17,6 +17,7 @@ from typing import Iterable, Optional, cast
from pydantic import BaseModel from pydantic import BaseModel
from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.table import Table, TableType from metadata.generated.schema.entity.data.table import Table, TableType
from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.services.ingestionPipelines.status import ( from metadata.generated.schema.entity.services.ingestionPipelines.status import (
@ -117,7 +118,6 @@ class OpenMetadataSource(Source):
self.metadata.health_check() self.metadata.health_check()
def _iter(self, *_, **__) -> Iterable[Either[ProfilerSourceAndEntity]]: def _iter(self, *_, **__) -> Iterable[Either[ProfilerSourceAndEntity]]:
for database in self.get_database_entities(): for database in self.get_database_entities():
try: try:
profiler_source = profiler_source_factory.create( profiler_source = profiler_source_factory.create(
@ -149,9 +149,17 @@ class OpenMetadataSource(Source):
def filter_databases(self, database: Database) -> Optional[Database]: def filter_databases(self, database: Database) -> Optional[Database]:
"""Returns filtered database entities""" """Returns filtered database entities"""
database_fqn = fqn.build(
self.metadata,
entity_type=Database,
service_name=self.config.source.serviceName,
database_name=database.name.__root__,
)
if filter_by_database( if filter_by_database(
self.source_config.databaseFilterPattern, self.source_config.databaseFilterPattern,
database.name.__root__, database_fqn
if self.source_config.useFqnForFiltering
else database.name.__root__,
): ):
self.status.filter(database.name.__root__, "Database pattern not allowed") self.status.filter(database.name.__root__, "Database pattern not allowed")
return None return None
@ -166,18 +174,38 @@ class OpenMetadataSource(Source):
""" """
for table in tables: for table in tables:
try: try:
schema_fqn = fqn.build(
self.metadata,
entity_type=DatabaseSchema,
service_name=self.config.source.serviceName,
database_name=table.database.name,
schema_name=table.databaseSchema.name,
)
if filter_by_schema( if filter_by_schema(
self.source_config.schemaFilterPattern, self.source_config.schemaFilterPattern,
table.databaseSchema.name, # type: ignore schema_fqn
if self.source_config.useFqnForFiltering
else table.databaseSchema.name, # type: ignore
): ):
self.status.filter( self.status.filter(
f"Schema pattern not allowed: {table.fullyQualifiedName.__root__}", f"Schema pattern not allowed: {table.fullyQualifiedName.__root__}",
"Schema pattern not allowed", "Schema pattern not allowed",
) )
continue continue
table_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name=self.config.source.serviceName,
database_name=table.database.name,
schema_name=table.databaseSchema.name,
table_name=table.name.__root__,
)
if filter_by_table( if filter_by_table(
self.source_config.tableFilterPattern, self.source_config.tableFilterPattern,
table.name.__root__, table_fqn
if self.source_config.useFqnForFiltering
else table.name.__root__,
): ):
self.status.filter( self.status.filter(
f"Table pattern not allowed: {table.fullyQualifiedName.__root__}", f"Table pattern not allowed: {table.fullyQualifiedName.__root__}",

View File

@ -26,7 +26,7 @@ from typing import Iterable, cast
from sqlalchemy.inspection import inspect from sqlalchemy.inspection import inspect
from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import Table, TableType from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.ingestionPipelines.status import ( from metadata.generated.schema.entity.services.ingestionPipelines.status import (
StackTraceError, StackTraceError,
) )
@ -197,8 +197,17 @@ class OpenMetadataSourceExt(OpenMetadataSource):
else: else:
database_names = self.source.get_database_names_raw() database_names = self.source.get_database_names_raw()
for database in database_names: for database in database_names:
database_fqn = fqn.build(
self.metadata,
entity_type=Database,
service_name=self.config.source.serviceName,
database_name=database,
)
if filter_by_database( if filter_by_database(
self.source_config.databaseFilterPattern, database self.source_config.databaseFilterPattern,
database_fqn
if self.source_config.useFqnForFiltering
else database,
): ):
self.status.filter(database, "Database pattern not allowed") self.status.filter(database, "Database pattern not allowed")
continue continue
@ -216,52 +225,6 @@ class OpenMetadataSourceExt(OpenMetadataSource):
logger.debug(f"Failed to fetch database names {exc}") logger.debug(f"Failed to fetch database names {exc}")
logger.debug(traceback.format_exc()) logger.debug(traceback.format_exc())
def filter_entities(self, tables: Iterable[Table]) -> Iterable[Table]:
"""
From a list of tables, apply the SQLSourceConfig
filter patterns.
We will update the status on the SQLSource Status.
"""
for table in tables:
try:
if filter_by_schema(
self.source_config.schemaFilterPattern,
table.databaseSchema.name, # type: ignore
):
self.status.filter(
f"Schema pattern not allowed: {table.fullyQualifiedName.__root__}",
"Schema pattern not allowed",
)
continue
if filter_by_table(
self.source_config.tableFilterPattern,
table.name.__root__,
):
self.status.filter(
f"Table pattern not allowed: {table.fullyQualifiedName.__root__}",
"Table pattern not allowed",
)
continue
if (
table.tableType == TableType.View
and not self.source_config.includeViews
):
self.status.filter(
table.fullyQualifiedName.__root__,
"View filtered out",
)
continue
yield table
except Exception as exc:
self.status.failed(
StackTraceError(
name=table.fullyQualifiedName.__root__,
error=f"Unexpected error filtering entities for table [{table}]: {exc}",
stackTrace=traceback.format_exc(),
)
)
def get_table_entities(self, database): def get_table_entities(self, database):
""" """
List and filter OpenMetadata tables based on the List and filter OpenMetadata tables based on the

View File

@ -49,7 +49,7 @@ from metadata.workflow.profiler import ProfilerWorkflow
TABLE = Table( TABLE = Table(
id=uuid.uuid4(), id=uuid.uuid4(),
name="users", name="users",
fullyQualifiedName="service.db.users", fullyQualifiedName="my_service.db.users",
columns=[ columns=[
Column(name="id", dataType=DataType.INT), Column(name="id", dataType=DataType.INT),
Column(name="name", dataType=DataType.STRING), Column(name="name", dataType=DataType.STRING),
@ -141,10 +141,10 @@ def test_filter_entities(mocked_method):
We can properly filter entities depending on the We can properly filter entities depending on the
workflow configuration workflow configuration
""" """
service_name = "my_service"
workflow = ProfilerWorkflow.create(config) workflow = ProfilerWorkflow.create(config)
mocked_method.assert_called() mocked_method.assert_called()
service_name = "service"
schema_reference1 = EntityReference( schema_reference1 = EntityReference(
id=uuid.uuid4(), name="one_schema", type="databaseSchema" id=uuid.uuid4(), name="one_schema", type="databaseSchema"
) )
@ -159,6 +159,7 @@ def test_filter_entities(mocked_method):
databaseSchema=schema_reference1, databaseSchema=schema_reference1,
fullyQualifiedName=f"{service_name}.db.{schema_reference1.name}.table1", fullyQualifiedName=f"{service_name}.db.{schema_reference1.name}.table1",
columns=[Column(name="id", dataType=DataType.BIGINT)], columns=[Column(name="id", dataType=DataType.BIGINT)],
database=EntityReference(id=uuid.uuid4(), name="db", type="database"),
), ),
Table( Table(
id=uuid.uuid4(), id=uuid.uuid4(),
@ -166,6 +167,7 @@ def test_filter_entities(mocked_method):
databaseSchema=schema_reference1, databaseSchema=schema_reference1,
fullyQualifiedName=f"{service_name}.db.{schema_reference1.name}.table2", fullyQualifiedName=f"{service_name}.db.{schema_reference1.name}.table2",
columns=[Column(name="id", dataType=DataType.BIGINT)], columns=[Column(name="id", dataType=DataType.BIGINT)],
database=EntityReference(id=uuid.uuid4(), name="db", type="database"),
), ),
Table( Table(
id=uuid.uuid4(), id=uuid.uuid4(),
@ -173,12 +175,52 @@ def test_filter_entities(mocked_method):
databaseSchema=schema_reference2, databaseSchema=schema_reference2,
fullyQualifiedName=f"{service_name}.db.{schema_reference2.name}.table3", fullyQualifiedName=f"{service_name}.db.{schema_reference2.name}.table3",
columns=[Column(name="id", dataType=DataType.BIGINT)], columns=[Column(name="id", dataType=DataType.BIGINT)],
database=EntityReference(id=uuid.uuid4(), name="db", type="database"),
), ),
] ]
# Simple workflow does not filter # Simple workflow does not filter
assert len(list(workflow.source.filter_entities(all_tables))) == 3 assert len(list(workflow.source.filter_entities(all_tables))) == 3
fqn_filter_config = deepcopy(config)
fqn_filter_config["source"]["sourceConfig"]["config"]["useFqnForFiltering"] = True
fqn_filter_config["source"]["sourceConfig"]["config"]["schemaFilterPattern"] = {
"excludes": ["my_service.db.another_schema"]
}
fqn_filter_workflow = ProfilerWorkflow.create(fqn_filter_config)
mocked_method.assert_called()
assert len(list(fqn_filter_workflow.source.filter_entities(all_tables))) == 2
fqn_filter_config_2 = deepcopy(config)
fqn_filter_config_2["source"]["sourceConfig"]["config"]["useFqnForFiltering"] = True
fqn_filter_config_2["source"]["sourceConfig"]["config"]["schemaFilterPattern"] = {
"includes": ["my_service.db.one_schema"]
}
fqn_filter_workflow_2 = ProfilerWorkflow.create(fqn_filter_config_2)
mocked_method.assert_called()
assert len(list(fqn_filter_workflow_2.source.filter_entities(all_tables))) == 2
fqn_filter_config_3 = deepcopy(config)
fqn_filter_config_3["source"]["sourceConfig"]["config"]["useFqnForFiltering"] = True
fqn_filter_config_3["source"]["sourceConfig"]["config"]["tableFilterPattern"] = {
"includes": ["my_service.db.one_schema.table1"]
}
fqn_filter_workflow_3 = ProfilerWorkflow.create(fqn_filter_config_3)
mocked_method.assert_called()
assert len(list(fqn_filter_workflow_3.source.filter_entities(all_tables))) == 1
fqn_filter_config_4 = deepcopy(config)
fqn_filter_config_4["source"]["sourceConfig"]["config"]["useFqnForFiltering"] = True
fqn_filter_config_4["source"]["sourceConfig"]["config"]["tableFilterPattern"] = {
"excludes": ["my_service.db.one_schema.table1"]
}
fqn_filter_workflow_4 = ProfilerWorkflow.create(fqn_filter_config_4)
mocked_method.assert_called()
assert len(list(fqn_filter_workflow_4.source.filter_entities(all_tables))) == 2
# We can exclude based on the schema name # We can exclude based on the schema name
exclude_config = deepcopy(config) exclude_config = deepcopy(config)
exclude_config["source"]["sourceConfig"]["config"]["schemaFilterPattern"] = { exclude_config["source"]["sourceConfig"]["config"]["schemaFilterPattern"] = {

View File

@ -79,12 +79,17 @@
"default": 5, "default": 5,
"title": "Thread Count" "title": "Thread Count"
}, },
"timeoutSeconds": { "timeoutSeconds": {
"description": "Profiler Timeout in Seconds", "description": "Profiler Timeout in Seconds",
"type": "integer", "type": "integer",
"default": 43200, "default": 43200,
"title": "Timeout (in sec.)" "title": "Timeout (in sec.)"
},
"useFqnForFiltering": {
"description": "Regex will be applied on fully qualified name (e.g service_name.db_name.schema_name.table_name) instead of raw name (e.g. table_name)",
"type": "boolean",
"default": false,
"title": "Use FQN For Filtering"
} }
}, },
"additionalProperties": false "additionalProperties": false