Fixes #11570 - Add support for BQ Multi-project Profiler (#11692)

* fix: extracted profiler object from workflow and implemented factory to allow service base logic

* fix: ran python linting

* fix: renamed `base` to `base_profiler_source`

* fix: add logic to set correct database for BQ multi project ID connections

* fix: ran python linting
This commit is contained in:
Teddy 2023-05-20 23:22:53 +02:00 committed by GitHub
parent b67e8f5fc0
commit ddbc7fe14d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 390 additions and 163 deletions

View File

@ -17,15 +17,13 @@ Workflow definition for the ORM Profiler.
- How to define metrics & tests
"""
import traceback
from copy import deepcopy
from typing import Iterable, List, Optional, Union, cast
from typing import Iterable, Optional, cast
from pydantic import ValidationError
from sqlalchemy import MetaData
from metadata.config.common import WorkflowExecutionError
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import ColumnProfilerConfig, Table
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.database.datalakeConnection import (
DatalakeConnection,
)
@ -53,21 +51,10 @@ from metadata.ingestion.models.custom_types import ServiceWithConnectionType
from metadata.ingestion.ometa.client_utils import create_ometa_client
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection, get_test_connection_fn
from metadata.profiler.api.models import (
ProfilerProcessorConfig,
ProfilerResponse,
TableConfig,
)
from metadata.profiler.interface.pandas.pandas_profiler_interface import (
PandasProfilerInterface,
)
from metadata.profiler.interface.profiler_protocol import ProfilerProtocol
from metadata.profiler.interface.sqlalchemy.sqa_profiler_interface import (
SQAProfilerInterface,
)
from metadata.profiler.metrics.registry import Metrics
from metadata.profiler.api.models import ProfilerProcessorConfig, ProfilerResponse
from metadata.profiler.processor.core import Profiler
from metadata.profiler.processor.default import DefaultProfiler, get_default_metrics
from metadata.profiler.source.base_profiler_source import BaseProfilerSource
from metadata.profiler.source.profiler_source_factory import profiler_source_factory
from metadata.timer.repeated_timer import RepeatedTimer
from metadata.timer.workflow_reporter import get_ingestion_status_timer
from metadata.utils import fqn
@ -112,7 +99,7 @@ class ProfilerWorkflow(WorkflowStatusMixin):
self.profiler_config = ProfilerProcessorConfig.parse_obj(
self.config.processor.dict().get("config")
)
self.metadata = OpenMetadata(self.metadata_config)
self.metadata = create_ometa_client(self.metadata_config)
self._retrieve_service_connection_if_needed()
self.test_connection()
self.set_ingestion_pipeline_status(state=PipelineState.running)
@ -168,71 +155,6 @@ class ProfilerWorkflow(WorkflowStatusMixin):
return self._timer
def get_config_for_entity(self, entity: Table) -> Optional[TableConfig]:
"""Get config for a specific entity
Args:
entity: table entity
"""
if not self.profiler_config.tableConfig:
return None
return next(
(
table_config
for table_config in self.profiler_config.tableConfig
if table_config.fullyQualifiedName.__root__
== entity.fullyQualifiedName.__root__ # type: ignore
),
None,
)
def get_include_columns(self, entity) -> Optional[List[ColumnProfilerConfig]]:
"""get included columns"""
entity_config: Optional[TableConfig] = self.get_config_for_entity(entity)
if entity_config and entity_config.columnConfig:
return entity_config.columnConfig.includeColumns
if entity.tableProfilerConfig:
return entity.tableProfilerConfig.includeColumns
return None
def get_exclude_columns(self, entity) -> Optional[List[str]]:
"""get included columns"""
entity_config: Optional[TableConfig] = self.get_config_for_entity(entity)
if entity_config and entity_config.columnConfig:
return entity_config.columnConfig.excludeColumns
if entity.tableProfilerConfig:
return entity.tableProfilerConfig.excludeColumns
return None
def create_profiler(
self, table_entity: Table, profiler_interface: ProfilerProtocol
):
"""Profile a single entity"""
if not self.profiler_config.profiler:
self.profiler = DefaultProfiler(
profiler_interface=profiler_interface,
include_columns=self.get_include_columns(table_entity),
exclude_columns=self.get_exclude_columns(table_entity),
)
else:
metrics = (
[Metrics.get(name) for name in self.profiler_config.profiler.metrics]
if self.profiler_config.profiler.metrics
else get_default_metrics(profiler_interface.table)
)
self.profiler = Profiler(
*metrics, # type: ignore
profiler_interface=profiler_interface,
include_columns=self.get_include_columns(table_entity),
exclude_columns=self.get_exclude_columns(table_entity),
)
def filter_databases(self, database: Database) -> Optional[Database]:
"""Returns filtered database entities"""
if filter_by_database(
@ -338,51 +260,17 @@ class ProfilerWorkflow(WorkflowStatusMixin):
yield from self.filter_entities(tables)
def copy_service_config(self, database) -> DatabaseService.__config__:
copy_service_connection_config = deepcopy(
self.config.source.serviceConnection.__root__.config # type: ignore
)
if hasattr(
self.config.source.serviceConnection.__root__.config, # type: ignore
"supportsDatabase",
):
if hasattr(copy_service_connection_config, "database"):
copy_service_connection_config.database = database.name.__root__ # type: ignore
if hasattr(copy_service_connection_config, "catalog"):
copy_service_connection_config.catalog = database.name.__root__ # type: ignore
# we know we'll only be working with databaseServices, we cast the type to satisfy type checker
copy_service_connection_config = cast(
DatabaseService.__config__, copy_service_connection_config
)
return copy_service_connection_config
def run_profiler(
self, entity: Table, copied_service_config, sqa_metadata=None
self, entity: Table, profiler_source: BaseProfilerSource
) -> Optional[ProfilerResponse]:
"""
Main logic for the profiler workflow
"""
try:
profiler_interface: Union[
SQAProfilerInterface, PandasProfilerInterface
] = ProfilerProtocol.create(
(
copied_service_config.__class__.__name__
if isinstance(copied_service_config, NON_SQA_DATABASE_CONNECTIONS)
else self.config.source.serviceConnection.__root__.__class__.__name__
),
entity,
self.get_config_for_entity(entity),
self.source_config,
copied_service_config,
create_ometa_client(self.metadata_config),
sqa_metadata=sqa_metadata,
) # type: ignore
self.create_profiler(entity, profiler_interface)
self.profiler = cast(Profiler, self.profiler) # satisfy type checker
profile: ProfilerResponse = self.profiler.process(
profiler_runner: Profiler = profiler_source.get_profiler_runner(
entity, self.profiler_config
)
profile: ProfilerResponse = profiler_runner.process(
self.source_config.generateSampleData,
self.source_config.processPiiSensitive,
)
@ -394,18 +282,20 @@ class ProfilerWorkflow(WorkflowStatusMixin):
self.source_status.failed(name, error, traceback.format_exc())
try:
# if we fail to instantiate a profiler_interface, we won't have a profiler_interface variable
# we'll also catch scenarios where we don't have an interface set
self.source_status.fail_all(
profiler_interface.processor_status.failures
profiler_source.interface.processor_status.failures
)
self.source_status.records.extend(
profiler_interface.processor_status.records
profiler_source.interface.processor_status.records
)
except UnboundLocalError:
except (UnboundLocalError, AttributeError):
pass
else:
self.source_status.fail_all(profiler_interface.processor_status.failures)
# at this point we know we have an interface variable since we the `try` block above didn't raise
self.source_status.fail_all(profiler_source.interface.processor_status.failures) # type: ignore
self.source_status.records.extend(
profiler_interface.processor_status.records
profiler_source.interface.processor_status.records # type: ignore
)
return profile
@ -419,18 +309,14 @@ class ProfilerWorkflow(WorkflowStatusMixin):
try:
for database in self.get_database_entities():
copied_service_config = self.copy_service_config(database)
sqa_metadata = (
MetaData()
if not isinstance(
copied_service_config, NON_SQA_DATABASE_CONNECTIONS
)
else None
) # we only need this for sqlalchemy based services
profiler_source = profiler_source_factory.create(
self.config.source.type.lower(),
self.config,
database,
self.metadata,
)
for entity in self.get_table_entities(database=database):
profile = self.run_profiler(
entity, copied_service_config, sqa_metadata
)
profile = self.run_profiler(entity, profiler_source)
if hasattr(self, "sink") and profile:
self.sink.write_record(profile)
# At the end of the `execute`, update the associated Ingestion Pipeline status as success

View File

@ -0,0 +1,228 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Base source for the profiler used to instantiate a profiler runner with
its interface
"""
from copy import deepcopy
from typing import List, Optional, Union, cast
from sqlalchemy import MetaData
from metadata.generated.schema.entity.data.table import ColumnProfilerConfig, Table
from metadata.generated.schema.entity.services.connections.database.datalakeConnection import (
DatalakeConnection,
)
from metadata.generated.schema.entity.services.databaseService import (
DatabaseConnection,
DatabaseService,
)
from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import (
DatabaseServiceProfilerPipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.profiler.api.models import ProfilerProcessorConfig, TableConfig
from metadata.profiler.interface.pandas.pandas_profiler_interface import (
PandasProfilerInterface,
)
from metadata.profiler.interface.profiler_protocol import ProfilerProtocol
from metadata.profiler.interface.sqlalchemy.sqa_profiler_interface import (
SQAProfilerInterface,
)
from metadata.profiler.metrics.registry import Metrics
from metadata.profiler.processor.core import Profiler
from metadata.profiler.processor.default import DefaultProfiler, get_default_metrics
NON_SQA_DATABASE_CONNECTIONS = (DatalakeConnection,)
class BaseProfilerSource:
"""
Base class for the profiler source
"""
def __init__(
self,
config: OpenMetadataWorkflowConfig,
database: DatabaseService,
ometa_client: OpenMetadata,
):
self.service_conn_config = self._copy_service_config(config, database)
self.source_config = config.source.sourceConfig.config
self.source_config = cast(
DatabaseServiceProfilerPipeline, self.source_config
) # satisfy type checker
self.profiler_config = ProfilerProcessorConfig.parse_obj(
config.processor.dict().get("config")
)
self.ometa_client = ometa_client
self.profiler_interface_type: str = self._get_profiler_interface_type(config)
self.sqa_metadata = self._set_sqa_metadata()
self._interface = None
@property
def interface(
self,
) -> Optional[Union[SQAProfilerInterface, PandasProfilerInterface]]:
"""Get the interface"""
return self._interface
@interface.setter
def interface(self, interface):
"""Set the interface"""
self._interface = interface
def _set_sqa_metadata(self):
"""Set sqlalchemy metadata"""
if not isinstance(self.service_conn_config, NON_SQA_DATABASE_CONNECTIONS):
return MetaData()
return None
def _get_profiler_interface_type(self, config) -> str:
"""_summary_
Args:
config (_type_): profiler config
Returns:
str:
"""
if isinstance(self.service_conn_config, NON_SQA_DATABASE_CONNECTIONS):
return self.service_conn_config.__class__.__name__
return config.source.serviceConnection.__root__.__class__.__name__
def _get_config_for_table(
self, entity: Table, profiler_config
) -> Optional[TableConfig]:
"""Get config for a specific entity
Args:
entity: table entity
"""
if not profiler_config.tableConfig:
return None
return next(
(
table_config
for table_config in profiler_config.tableConfig
if table_config.fullyQualifiedName.__root__
== entity.fullyQualifiedName.__root__ # type: ignore
),
None,
)
def _get_include_columns(
self, entity, entity_config: Optional[TableConfig]
) -> Optional[List[ColumnProfilerConfig]]:
"""get included columns"""
if entity_config and entity_config.columnConfig:
return entity_config.columnConfig.includeColumns
if entity.tableProfilerConfig:
return entity.tableProfilerConfig.includeColumns
return None
def _get_exclude_columns(
self, entity, entity_config: Optional[TableConfig]
) -> Optional[List[str]]:
"""get included columns"""
if entity_config and entity_config.columnConfig:
return entity_config.columnConfig.excludeColumns
if entity.tableProfilerConfig:
return entity.tableProfilerConfig.excludeColumns
return None
def _copy_service_config(
self, config: OpenMetadataWorkflowConfig, database: DatabaseService
) -> DatabaseConnection:
"""Make a copy of the service config and update the database name
Args:
database (_type_): a database entity
Returns:
DatabaseService.__config__
"""
config_copy = deepcopy(
config.source.serviceConnection.__root__.config # type: ignore
)
if hasattr(
config_copy, # type: ignore
"supportsDatabase",
):
if hasattr(config_copy, "database"):
config_copy.database = database.name.__root__ # type: ignore
if hasattr(config_copy, "catalog"):
config_copy.catalog = database.name.__root__ # type: ignore
# we know we'll only be working with DatabaseConnection, we cast the type to satisfy type checker
config_copy = cast(DatabaseConnection, config_copy)
return config_copy
def create_profiler_interface(
self,
entity: Table,
table_config: Optional[TableConfig],
) -> Union[SQAProfilerInterface, PandasProfilerInterface]:
"""Create sqlalchemy profiler interface"""
profiler_interface: Union[
SQAProfilerInterface, PandasProfilerInterface
] = ProfilerProtocol.create(
self.profiler_interface_type,
entity,
table_config,
self.source_config,
self.service_conn_config,
self.ometa_client,
sqa_metadata=self.sqa_metadata,
) # type: ignore
self.interface = profiler_interface
return self.interface
def get_profiler_runner(
self, entity: Table, profiler_config: ProfilerProcessorConfig
) -> Profiler:
"""
Returns the runner for the profiler
"""
table_config = self._get_config_for_table(entity, profiler_config)
profiler_interface = self.create_profiler_interface(
entity,
table_config,
)
if not profiler_config.profiler:
return DefaultProfiler(
profiler_interface=profiler_interface,
include_columns=self._get_include_columns(entity, table_config),
exclude_columns=self._get_exclude_columns(entity, table_config),
)
metrics = (
[Metrics.get(name) for name in profiler_config.profiler.metrics]
if profiler_config.profiler.metrics
else get_default_metrics(profiler_interface.table)
)
return Profiler(
*metrics, # type: ignore
profiler_interface=profiler_interface,
include_columns=self._get_include_columns(entity, table_config),
exclude_columns=self._get_exclude_columns(entity, table_config),
)

View File

@ -0,0 +1,61 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Bigquery Profiler source
"""
from copy import deepcopy
from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import (
BigQueryConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.security.credentials.gcsValues import (
GcsCredentialsValues,
MultipleProjectId,
SingleProjectId,
)
from metadata.profiler.source.base_profiler_source import BaseProfilerSource
class BigQueryProfilerSource(BaseProfilerSource):
"""override the base profiler source to handle BigQuery specific connection configs"""
def _copy_service_config(
self, config: OpenMetadataWorkflowConfig, database: DatabaseService
) -> BigQueryConnection:
"""Make a copy of the database connection config. If MultiProjectId is used, replace it
with SingleProjectId with the database name being profiled. We iterate over all non filtered
database in workflow.py `def execute`.
Args:
database (DatabaseService): a database entity
Returns:
DatabaseConnection
"""
config_copy: BigQueryConnection = deepcopy(
config.source.serviceConnection.__root__.config # type: ignore
)
if isinstance(config_copy.credentials.gcsConfig, GcsCredentialsValues):
if isinstance(
config_copy.credentials.gcsConfig.projectId, MultipleProjectId
):
config_copy.credentials.gcsConfig.projectId = SingleProjectId(
__root__=database.name.__root__
)
return config_copy

View File

@ -0,0 +1,45 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Factory class for creating profiler source objects
"""
from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import (
BigqueryType,
)
from metadata.profiler.source.base_profiler_source import BaseProfilerSource
from metadata.profiler.source.bigquery.profiler_source import BigQueryProfilerSource
class ProfilerSourceFactory:
"""Creational factory for profiler source objects"""
def __init__(self):
self._source_type = {"base": BaseProfilerSource}
def register_source(self, source_type: str, source_class):
"""Register a new source type"""
self._source_type[source_type] = source_class
def create(self, source_type: str, *args, **kwargs) -> BaseProfilerSource:
"""Create source object based on source type"""
source_class = self._source_type.get(source_type)
if not source_class:
source_class = self._source_type["base"]
return source_class(*args, **kwargs)
return source_class(*args, **kwargs)
profiler_source_factory = ProfilerSourceFactory()
profiler_source_factory.register_source(
BigqueryType.BigQuery.value.lower(), BigQueryProfilerSource
)

View File

@ -30,7 +30,11 @@ from metadata.generated.schema.entity.data.table import (
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseConnection
from metadata.generated.schema.entity.services.databaseService import (
DatabaseConnection,
DatabaseService,
DatabaseServiceType,
)
from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import (
DatabaseServiceProfilerPipeline,
)
@ -42,6 +46,7 @@ from metadata.profiler.interface.sqlalchemy.sqa_profiler_interface import (
SQAProfilerInterface,
)
from metadata.profiler.processor.default import DefaultProfiler
from metadata.profiler.source.base_profiler_source import BaseProfilerSource
TABLE = Table(
id=uuid.uuid4(),
@ -228,20 +233,21 @@ def test_profile_def(mocked_method, mocked_orm): # pylint: disable=unused-argum
profile_workflow = ProfilerWorkflow.create(profile_config)
mocked_method.assert_called()
profiler_interface: SQAProfilerInterface = ProfilerProtocol.create(
_profiler_type=DatabaseConnection.__name__,
entity=TABLE,
entity_config=profile_workflow.get_config_for_entity(TABLE),
source_config=profile_workflow.source_config,
service_connection_config=profile_workflow.config.source.serviceConnection.__root__.config,
ometa_client=None,
sqa_metadata=MetaData(),
profiler_source = BaseProfilerSource(
profile_workflow.config,
DatabaseService(
id=uuid.uuid4(),
name="myDataBaseService",
serviceType=DatabaseServiceType.SQLite,
), # type: ignore
profile_workflow.metadata,
)
profiler_runner = profiler_source.get_profiler_runner(
TABLE, profile_workflow.profiler_config
)
profile_workflow.create_profiler(TABLE, profiler_interface)
profiler_obj_metrics = [
metric.name() for metric in profile_workflow.profiler.metrics
]
# profile_workflow.create_profiler(TABLE, profiler_interface)
profiler_obj_metrics = [metric.name() for metric in profiler_runner.metrics]
assert profile_workflow.profiler_config.profiler
assert config_metrics_label == profiler_obj_metrics
@ -268,20 +274,21 @@ def test_default_profile_def(
profile_workflow = ProfilerWorkflow.create(config)
mocked_method.assert_called()
profiler_interface: SQAProfilerInterface = ProfilerProtocol.create(
_profiler_type=DatabaseConnection.__name__,
entity=TABLE,
entity_config=profile_workflow.get_config_for_entity(TABLE),
source_config=profile_workflow.source_config,
service_connection_config=profile_workflow.config.source.serviceConnection.__root__.config,
ometa_client=None,
sqa_metadata=MetaData(),
profiler_source = BaseProfilerSource(
profile_workflow.config,
DatabaseService(
id=uuid.uuid4(),
name="myDataBaseService",
serviceType=DatabaseServiceType.SQLite,
), # type: ignore
profile_workflow.metadata,
)
profiler_runner = profiler_source.get_profiler_runner(
TABLE, profile_workflow.profiler_config
)
profile_workflow.create_profiler(TABLE, profiler_interface)
assert isinstance(
profile_workflow.profiler,
profiler_runner,
DefaultProfiler,
)