diff --git a/ingestion/src/metadata/profiler/api/workflow.py b/ingestion/src/metadata/profiler/api/workflow.py index f3cb1eb5576..d24a2577bd5 100644 --- a/ingestion/src/metadata/profiler/api/workflow.py +++ b/ingestion/src/metadata/profiler/api/workflow.py @@ -17,15 +17,13 @@ Workflow definition for the ORM Profiler. - How to define metrics & tests """ import traceback -from copy import deepcopy -from typing import Iterable, List, Optional, Union, cast +from typing import Iterable, Optional, cast from pydantic import ValidationError -from sqlalchemy import MetaData from metadata.config.common import WorkflowExecutionError from metadata.generated.schema.entity.data.database import Database -from metadata.generated.schema.entity.data.table import ColumnProfilerConfig, Table +from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.connections.database.datalakeConnection import ( DatalakeConnection, ) @@ -53,21 +51,10 @@ from metadata.ingestion.models.custom_types import ServiceWithConnectionType from metadata.ingestion.ometa.client_utils import create_ometa_client from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_connection, get_test_connection_fn -from metadata.profiler.api.models import ( - ProfilerProcessorConfig, - ProfilerResponse, - TableConfig, -) -from metadata.profiler.interface.pandas.pandas_profiler_interface import ( - PandasProfilerInterface, -) -from metadata.profiler.interface.profiler_protocol import ProfilerProtocol -from metadata.profiler.interface.sqlalchemy.sqa_profiler_interface import ( - SQAProfilerInterface, -) -from metadata.profiler.metrics.registry import Metrics +from metadata.profiler.api.models import ProfilerProcessorConfig, ProfilerResponse from metadata.profiler.processor.core import Profiler -from metadata.profiler.processor.default import DefaultProfiler, get_default_metrics +from metadata.profiler.source.base_profiler_source import BaseProfilerSource +from metadata.profiler.source.profiler_source_factory import profiler_source_factory from metadata.timer.repeated_timer import RepeatedTimer from metadata.timer.workflow_reporter import get_ingestion_status_timer from metadata.utils import fqn @@ -112,7 +99,7 @@ class ProfilerWorkflow(WorkflowStatusMixin): self.profiler_config = ProfilerProcessorConfig.parse_obj( self.config.processor.dict().get("config") ) - self.metadata = OpenMetadata(self.metadata_config) + self.metadata = create_ometa_client(self.metadata_config) self._retrieve_service_connection_if_needed() self.test_connection() self.set_ingestion_pipeline_status(state=PipelineState.running) @@ -168,71 +155,6 @@ class ProfilerWorkflow(WorkflowStatusMixin): return self._timer - def get_config_for_entity(self, entity: Table) -> Optional[TableConfig]: - """Get config for a specific entity - - Args: - entity: table entity - """ - - if not self.profiler_config.tableConfig: - return None - return next( - ( - table_config - for table_config in self.profiler_config.tableConfig - if table_config.fullyQualifiedName.__root__ - == entity.fullyQualifiedName.__root__ # type: ignore - ), - None, - ) - - def get_include_columns(self, entity) -> Optional[List[ColumnProfilerConfig]]: - """get included columns""" - entity_config: Optional[TableConfig] = self.get_config_for_entity(entity) - if entity_config and entity_config.columnConfig: - return entity_config.columnConfig.includeColumns - - if entity.tableProfilerConfig: - return entity.tableProfilerConfig.includeColumns - - return None - - def get_exclude_columns(self, entity) -> Optional[List[str]]: - """get included columns""" - entity_config: Optional[TableConfig] = self.get_config_for_entity(entity) - if entity_config and entity_config.columnConfig: - return entity_config.columnConfig.excludeColumns - - if entity.tableProfilerConfig: - return entity.tableProfilerConfig.excludeColumns - - return None - - def create_profiler( - self, table_entity: Table, profiler_interface: ProfilerProtocol - ): - """Profile a single entity""" - if not self.profiler_config.profiler: - self.profiler = DefaultProfiler( - profiler_interface=profiler_interface, - include_columns=self.get_include_columns(table_entity), - exclude_columns=self.get_exclude_columns(table_entity), - ) - else: - metrics = ( - [Metrics.get(name) for name in self.profiler_config.profiler.metrics] - if self.profiler_config.profiler.metrics - else get_default_metrics(profiler_interface.table) - ) - - self.profiler = Profiler( - *metrics, # type: ignore - profiler_interface=profiler_interface, - include_columns=self.get_include_columns(table_entity), - exclude_columns=self.get_exclude_columns(table_entity), - ) - def filter_databases(self, database: Database) -> Optional[Database]: """Returns filtered database entities""" if filter_by_database( @@ -338,51 +260,17 @@ class ProfilerWorkflow(WorkflowStatusMixin): yield from self.filter_entities(tables) - def copy_service_config(self, database) -> DatabaseService.__config__: - copy_service_connection_config = deepcopy( - self.config.source.serviceConnection.__root__.config # type: ignore - ) - if hasattr( - self.config.source.serviceConnection.__root__.config, # type: ignore - "supportsDatabase", - ): - if hasattr(copy_service_connection_config, "database"): - copy_service_connection_config.database = database.name.__root__ # type: ignore - if hasattr(copy_service_connection_config, "catalog"): - copy_service_connection_config.catalog = database.name.__root__ # type: ignore - - # we know we'll only be working with databaseServices, we cast the type to satisfy type checker - copy_service_connection_config = cast( - DatabaseService.__config__, copy_service_connection_config - ) - - return copy_service_connection_config - def run_profiler( - self, entity: Table, copied_service_config, sqa_metadata=None + self, entity: Table, profiler_source: BaseProfilerSource ) -> Optional[ProfilerResponse]: """ Main logic for the profiler workflow """ try: - profiler_interface: Union[ - SQAProfilerInterface, PandasProfilerInterface - ] = ProfilerProtocol.create( - ( - copied_service_config.__class__.__name__ - if isinstance(copied_service_config, NON_SQA_DATABASE_CONNECTIONS) - else self.config.source.serviceConnection.__root__.__class__.__name__ - ), - entity, - self.get_config_for_entity(entity), - self.source_config, - copied_service_config, - create_ometa_client(self.metadata_config), - sqa_metadata=sqa_metadata, - ) # type: ignore - self.create_profiler(entity, profiler_interface) - self.profiler = cast(Profiler, self.profiler) # satisfy type checker - profile: ProfilerResponse = self.profiler.process( + profiler_runner: Profiler = profiler_source.get_profiler_runner( + entity, self.profiler_config + ) + profile: ProfilerResponse = profiler_runner.process( self.source_config.generateSampleData, self.source_config.processPiiSensitive, ) @@ -394,18 +282,20 @@ class ProfilerWorkflow(WorkflowStatusMixin): self.source_status.failed(name, error, traceback.format_exc()) try: # if we fail to instantiate a profiler_interface, we won't have a profiler_interface variable + # we'll also catch scenarios where we don't have an interface set self.source_status.fail_all( - profiler_interface.processor_status.failures + profiler_source.interface.processor_status.failures ) self.source_status.records.extend( - profiler_interface.processor_status.records + profiler_source.interface.processor_status.records ) - except UnboundLocalError: + except (UnboundLocalError, AttributeError): pass else: - self.source_status.fail_all(profiler_interface.processor_status.failures) + # at this point we know we have an interface variable since we the `try` block above didn't raise + self.source_status.fail_all(profiler_source.interface.processor_status.failures) # type: ignore self.source_status.records.extend( - profiler_interface.processor_status.records + profiler_source.interface.processor_status.records # type: ignore ) return profile @@ -419,18 +309,14 @@ class ProfilerWorkflow(WorkflowStatusMixin): try: for database in self.get_database_entities(): - copied_service_config = self.copy_service_config(database) - sqa_metadata = ( - MetaData() - if not isinstance( - copied_service_config, NON_SQA_DATABASE_CONNECTIONS - ) - else None - ) # we only need this for sqlalchemy based services + profiler_source = profiler_source_factory.create( + self.config.source.type.lower(), + self.config, + database, + self.metadata, + ) for entity in self.get_table_entities(database=database): - profile = self.run_profiler( - entity, copied_service_config, sqa_metadata - ) + profile = self.run_profiler(entity, profiler_source) if hasattr(self, "sink") and profile: self.sink.write_record(profile) # At the end of the `execute`, update the associated Ingestion Pipeline status as success diff --git a/ingestion/src/metadata/profiler/source/__init__.py b/ingestion/src/metadata/profiler/source/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ingestion/src/metadata/profiler/source/base_profiler_source.py b/ingestion/src/metadata/profiler/source/base_profiler_source.py new file mode 100644 index 00000000000..379eab31e3a --- /dev/null +++ b/ingestion/src/metadata/profiler/source/base_profiler_source.py @@ -0,0 +1,228 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Base source for the profiler used to instantiate a profiler runner with +its interface +""" +from copy import deepcopy +from typing import List, Optional, Union, cast + +from sqlalchemy import MetaData + +from metadata.generated.schema.entity.data.table import ColumnProfilerConfig, Table +from metadata.generated.schema.entity.services.connections.database.datalakeConnection import ( + DatalakeConnection, +) +from metadata.generated.schema.entity.services.databaseService import ( + DatabaseConnection, + DatabaseService, +) +from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import ( + DatabaseServiceProfilerPipeline, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + OpenMetadataWorkflowConfig, +) +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.profiler.api.models import ProfilerProcessorConfig, TableConfig +from metadata.profiler.interface.pandas.pandas_profiler_interface import ( + PandasProfilerInterface, +) +from metadata.profiler.interface.profiler_protocol import ProfilerProtocol +from metadata.profiler.interface.sqlalchemy.sqa_profiler_interface import ( + SQAProfilerInterface, +) +from metadata.profiler.metrics.registry import Metrics +from metadata.profiler.processor.core import Profiler +from metadata.profiler.processor.default import DefaultProfiler, get_default_metrics + +NON_SQA_DATABASE_CONNECTIONS = (DatalakeConnection,) + + +class BaseProfilerSource: + """ + Base class for the profiler source + """ + + def __init__( + self, + config: OpenMetadataWorkflowConfig, + database: DatabaseService, + ometa_client: OpenMetadata, + ): + self.service_conn_config = self._copy_service_config(config, database) + self.source_config = config.source.sourceConfig.config + self.source_config = cast( + DatabaseServiceProfilerPipeline, self.source_config + ) # satisfy type checker + self.profiler_config = ProfilerProcessorConfig.parse_obj( + config.processor.dict().get("config") + ) + self.ometa_client = ometa_client + self.profiler_interface_type: str = self._get_profiler_interface_type(config) + self.sqa_metadata = self._set_sqa_metadata() + self._interface = None + + @property + def interface( + self, + ) -> Optional[Union[SQAProfilerInterface, PandasProfilerInterface]]: + """Get the interface""" + return self._interface + + @interface.setter + def interface(self, interface): + """Set the interface""" + self._interface = interface + + def _set_sqa_metadata(self): + """Set sqlalchemy metadata""" + if not isinstance(self.service_conn_config, NON_SQA_DATABASE_CONNECTIONS): + return MetaData() + return None + + def _get_profiler_interface_type(self, config) -> str: + """_summary_ + + Args: + config (_type_): profiler config + Returns: + str: + """ + if isinstance(self.service_conn_config, NON_SQA_DATABASE_CONNECTIONS): + return self.service_conn_config.__class__.__name__ + return config.source.serviceConnection.__root__.__class__.__name__ + + def _get_config_for_table( + self, entity: Table, profiler_config + ) -> Optional[TableConfig]: + """Get config for a specific entity + + Args: + entity: table entity + """ + if not profiler_config.tableConfig: + return None + return next( + ( + table_config + for table_config in profiler_config.tableConfig + if table_config.fullyQualifiedName.__root__ + == entity.fullyQualifiedName.__root__ # type: ignore + ), + None, + ) + + def _get_include_columns( + self, entity, entity_config: Optional[TableConfig] + ) -> Optional[List[ColumnProfilerConfig]]: + """get included columns""" + if entity_config and entity_config.columnConfig: + return entity_config.columnConfig.includeColumns + + if entity.tableProfilerConfig: + return entity.tableProfilerConfig.includeColumns + + return None + + def _get_exclude_columns( + self, entity, entity_config: Optional[TableConfig] + ) -> Optional[List[str]]: + """get included columns""" + if entity_config and entity_config.columnConfig: + return entity_config.columnConfig.excludeColumns + + if entity.tableProfilerConfig: + return entity.tableProfilerConfig.excludeColumns + + return None + + def _copy_service_config( + self, config: OpenMetadataWorkflowConfig, database: DatabaseService + ) -> DatabaseConnection: + """Make a copy of the service config and update the database name + + Args: + database (_type_): a database entity + + Returns: + DatabaseService.__config__ + """ + config_copy = deepcopy( + config.source.serviceConnection.__root__.config # type: ignore + ) + if hasattr( + config_copy, # type: ignore + "supportsDatabase", + ): + if hasattr(config_copy, "database"): + config_copy.database = database.name.__root__ # type: ignore + if hasattr(config_copy, "catalog"): + config_copy.catalog = database.name.__root__ # type: ignore + + # we know we'll only be working with DatabaseConnection, we cast the type to satisfy type checker + config_copy = cast(DatabaseConnection, config_copy) + + return config_copy + + def create_profiler_interface( + self, + entity: Table, + table_config: Optional[TableConfig], + ) -> Union[SQAProfilerInterface, PandasProfilerInterface]: + """Create sqlalchemy profiler interface""" + profiler_interface: Union[ + SQAProfilerInterface, PandasProfilerInterface + ] = ProfilerProtocol.create( + self.profiler_interface_type, + entity, + table_config, + self.source_config, + self.service_conn_config, + self.ometa_client, + sqa_metadata=self.sqa_metadata, + ) # type: ignore + + self.interface = profiler_interface + return self.interface + + def get_profiler_runner( + self, entity: Table, profiler_config: ProfilerProcessorConfig + ) -> Profiler: + """ + Returns the runner for the profiler + """ + table_config = self._get_config_for_table(entity, profiler_config) + profiler_interface = self.create_profiler_interface( + entity, + table_config, + ) + + if not profiler_config.profiler: + return DefaultProfiler( + profiler_interface=profiler_interface, + include_columns=self._get_include_columns(entity, table_config), + exclude_columns=self._get_exclude_columns(entity, table_config), + ) + + metrics = ( + [Metrics.get(name) for name in profiler_config.profiler.metrics] + if profiler_config.profiler.metrics + else get_default_metrics(profiler_interface.table) + ) + + return Profiler( + *metrics, # type: ignore + profiler_interface=profiler_interface, + include_columns=self._get_include_columns(entity, table_config), + exclude_columns=self._get_exclude_columns(entity, table_config), + ) diff --git a/ingestion/src/metadata/profiler/source/bigquery/__init__.py b/ingestion/src/metadata/profiler/source/bigquery/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ingestion/src/metadata/profiler/source/bigquery/profiler_source.py b/ingestion/src/metadata/profiler/source/bigquery/profiler_source.py new file mode 100644 index 00000000000..300d550044b --- /dev/null +++ b/ingestion/src/metadata/profiler/source/bigquery/profiler_source.py @@ -0,0 +1,61 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Bigquery Profiler source +""" + +from copy import deepcopy + +from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import ( + BigQueryConnection, +) +from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.generated.schema.metadataIngestion.workflow import ( + OpenMetadataWorkflowConfig, +) +from metadata.generated.schema.security.credentials.gcsValues import ( + GcsCredentialsValues, + MultipleProjectId, + SingleProjectId, +) +from metadata.profiler.source.base_profiler_source import BaseProfilerSource + + +class BigQueryProfilerSource(BaseProfilerSource): + """override the base profiler source to handle BigQuery specific connection configs""" + + def _copy_service_config( + self, config: OpenMetadataWorkflowConfig, database: DatabaseService + ) -> BigQueryConnection: + """Make a copy of the database connection config. If MultiProjectId is used, replace it + with SingleProjectId with the database name being profiled. We iterate over all non filtered + database in workflow.py `def execute`. + + Args: + database (DatabaseService): a database entity + + Returns: + DatabaseConnection + """ + config_copy: BigQueryConnection = deepcopy( + config.source.serviceConnection.__root__.config # type: ignore + ) + + if isinstance(config_copy.credentials.gcsConfig, GcsCredentialsValues): + if isinstance( + config_copy.credentials.gcsConfig.projectId, MultipleProjectId + ): + config_copy.credentials.gcsConfig.projectId = SingleProjectId( + __root__=database.name.__root__ + ) + + return config_copy diff --git a/ingestion/src/metadata/profiler/source/profiler_source_factory.py b/ingestion/src/metadata/profiler/source/profiler_source_factory.py new file mode 100644 index 00000000000..60a35f12234 --- /dev/null +++ b/ingestion/src/metadata/profiler/source/profiler_source_factory.py @@ -0,0 +1,45 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Factory class for creating profiler source objects +""" + +from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import ( + BigqueryType, +) +from metadata.profiler.source.base_profiler_source import BaseProfilerSource +from metadata.profiler.source.bigquery.profiler_source import BigQueryProfilerSource + + +class ProfilerSourceFactory: + """Creational factory for profiler source objects""" + + def __init__(self): + self._source_type = {"base": BaseProfilerSource} + + def register_source(self, source_type: str, source_class): + """Register a new source type""" + self._source_type[source_type] = source_class + + def create(self, source_type: str, *args, **kwargs) -> BaseProfilerSource: + """Create source object based on source type""" + source_class = self._source_type.get(source_type) + if not source_class: + source_class = self._source_type["base"] + return source_class(*args, **kwargs) + return source_class(*args, **kwargs) + + +profiler_source_factory = ProfilerSourceFactory() +profiler_source_factory.register_source( + BigqueryType.BigQuery.value.lower(), BigQueryProfilerSource +) diff --git a/ingestion/tests/unit/profiler/test_workflow.py b/ingestion/tests/unit/profiler/test_workflow.py index d01468d27b0..1ee83537c74 100644 --- a/ingestion/tests/unit/profiler/test_workflow.py +++ b/ingestion/tests/unit/profiler/test_workflow.py @@ -30,7 +30,11 @@ from metadata.generated.schema.entity.data.table import ( from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) -from metadata.generated.schema.entity.services.databaseService import DatabaseConnection +from metadata.generated.schema.entity.services.databaseService import ( + DatabaseConnection, + DatabaseService, + DatabaseServiceType, +) from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import ( DatabaseServiceProfilerPipeline, ) @@ -42,6 +46,7 @@ from metadata.profiler.interface.sqlalchemy.sqa_profiler_interface import ( SQAProfilerInterface, ) from metadata.profiler.processor.default import DefaultProfiler +from metadata.profiler.source.base_profiler_source import BaseProfilerSource TABLE = Table( id=uuid.uuid4(), @@ -228,20 +233,21 @@ def test_profile_def(mocked_method, mocked_orm): # pylint: disable=unused-argum profile_workflow = ProfilerWorkflow.create(profile_config) mocked_method.assert_called() - profiler_interface: SQAProfilerInterface = ProfilerProtocol.create( - _profiler_type=DatabaseConnection.__name__, - entity=TABLE, - entity_config=profile_workflow.get_config_for_entity(TABLE), - source_config=profile_workflow.source_config, - service_connection_config=profile_workflow.config.source.serviceConnection.__root__.config, - ometa_client=None, - sqa_metadata=MetaData(), + profiler_source = BaseProfilerSource( + profile_workflow.config, + DatabaseService( + id=uuid.uuid4(), + name="myDataBaseService", + serviceType=DatabaseServiceType.SQLite, + ), # type: ignore + profile_workflow.metadata, + ) + profiler_runner = profiler_source.get_profiler_runner( + TABLE, profile_workflow.profiler_config ) - profile_workflow.create_profiler(TABLE, profiler_interface) - profiler_obj_metrics = [ - metric.name() for metric in profile_workflow.profiler.metrics - ] + # profile_workflow.create_profiler(TABLE, profiler_interface) + profiler_obj_metrics = [metric.name() for metric in profiler_runner.metrics] assert profile_workflow.profiler_config.profiler assert config_metrics_label == profiler_obj_metrics @@ -268,20 +274,21 @@ def test_default_profile_def( profile_workflow = ProfilerWorkflow.create(config) mocked_method.assert_called() - profiler_interface: SQAProfilerInterface = ProfilerProtocol.create( - _profiler_type=DatabaseConnection.__name__, - entity=TABLE, - entity_config=profile_workflow.get_config_for_entity(TABLE), - source_config=profile_workflow.source_config, - service_connection_config=profile_workflow.config.source.serviceConnection.__root__.config, - ometa_client=None, - sqa_metadata=MetaData(), + profiler_source = BaseProfilerSource( + profile_workflow.config, + DatabaseService( + id=uuid.uuid4(), + name="myDataBaseService", + serviceType=DatabaseServiceType.SQLite, + ), # type: ignore + profile_workflow.metadata, + ) + profiler_runner = profiler_source.get_profiler_runner( + TABLE, profile_workflow.profiler_config ) - profile_workflow.create_profiler(TABLE, profiler_interface) - assert isinstance( - profile_workflow.profiler, + profiler_runner, DefaultProfiler, )