diff --git a/ingestion/examples/airflow/dags/airflow_sample_data.py b/ingestion/examples/airflow/dags/airflow_sample_data.py index b8c293a63e6..4144d85b3ec 100644 --- a/ingestion/examples/airflow/dags/airflow_sample_data.py +++ b/ingestion/examples/airflow/dags/airflow_sample_data.py @@ -34,7 +34,7 @@ default_args = { config = """ source: - type: sample_data + type: sample-data serviceName: sample_data_ingestion serviceConnection: config: diff --git a/ingestion/src/metadata/config/workflow.py b/ingestion/src/metadata/config/workflow.py deleted file mode 100644 index fa288db653b..00000000000 --- a/ingestion/src/metadata/config/workflow.py +++ /dev/null @@ -1,119 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -""" -Workflow related configurations and utilities -""" -import importlib -import logging -from typing import Optional, Type, TypeVar - -from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( - OpenMetadataConnection, -) -from metadata.generated.schema.metadataIngestion.workflow import ( - Processor as WorkflowProcessor, -) -from metadata.generated.schema.metadataIngestion.workflow import Sink as WorkflowSink -from metadata.ingestion.api.processor import Processor -from metadata.ingestion.api.sink import Sink - -logger = logging.getLogger("Config") - -T = TypeVar("T") - - -def fetch_type_class(_type: str, is_file: bool): - """ - Helper function to build the path for - dynamic imports - """ - if is_file: - return _type.replace("-", "_") - return "".join([i.title() for i in _type.replace("-", "_").split("_")]) - - -def get_class(key: str) -> Optional[Type[T]]: - """ - Given an import key, import the class and return it - """ - if key.find(".") >= 0: - # If the key contains a dot, we treat it as an import path and attempt - # to load it dynamically. - module_name, class_name = key.rsplit(".", 1) - my_class = getattr(importlib.import_module(module_name), class_name) - return my_class - - return None - - -def get_sink( - sink_type: str, - sink_config: WorkflowSink, - metadata_config: OpenMetadataConnection, - _from: str = "ingestion", -) -> Sink: - """ - Helps us to fetch and importing the sink class. - - By default, we will pick it up from `ingestion`. - - :param sink_type: Type specified in the config, e.g., metadata-rest - :param sink_config: Specific sink configurations, such as the host - :param metadata_config: Metadata server configurations - :param _from: From where do we load the sink class. Ingestion by default. - """ - sink_class = get_class( - f"metadata.{_from}.sink.{fetch_type_class(sink_type, is_file=True)}." - f"{fetch_type_class(sink_type, is_file=False)}Sink" - ) - - sink: Sink = sink_class.create( - sink_config.dict().get("config", {}), metadata_config - ) - - logger.debug(f"Sink type: {sink_type}, {sink_class} configured") - - return sink - - -def get_processor( - processor_type: str, - processor_config: WorkflowProcessor, - metadata_config: OpenMetadataConnection, - _from: str = "ingestion", - **kwargs, -) -> Processor: - """ - Helps us to fetch and import the Processor class. - - By default, we will pick it up from `ingestion` - - We allow to pass any other specific object we may require. - E.g., for the ORM Profiler we need a Session to reach - the source tables. - - :param processor_type: Type specified in the config, e.g., metadata-rest - :param processor_config: Specific Processor configurations, such as the profiler and tests - :param metadata_config: Metadata server configurations - :param _from: From where do we load the sink class. Ingestion by default. - """ - processor_class = get_class( - f"metadata.{_from}.processor.{fetch_type_class(processor_type, is_file=True)}." - f"{fetch_type_class(processor_type, is_file=False)}Processor" - ) - - processor: Processor = processor_class.create( - processor_config.dict().get("config", {}), metadata_config, **kwargs - ) - - logger.debug(f"Sink type: {processor_type}, {processor_class} configured") - - return processor diff --git a/ingestion/src/metadata/data_insight/api/workflow.py b/ingestion/src/metadata/data_insight/api/workflow.py index a610f0bd27c..e5b584d8067 100644 --- a/ingestion/src/metadata/data_insight/api/workflow.py +++ b/ingestion/src/metadata/data_insight/api/workflow.py @@ -27,7 +27,6 @@ from typing import Optional, Union, cast from pydantic import ValidationError from metadata.config.common import WorkflowExecutionError -from metadata.config.workflow import get_sink from metadata.data_insight.helper.data_insight_es_index import DataInsightEsIndex from metadata.data_insight.processor.data_processor import DataProcessor from metadata.data_insight.processor.entity_report_data_processor import ( @@ -55,6 +54,7 @@ from metadata.ingestion.api.parser import parse_workflow_config_gracefully from metadata.ingestion.api.processor import ProcessorStatus from metadata.ingestion.ometa.ometa_api import EntityList, OpenMetadata from metadata.ingestion.sink.elasticsearch import ElasticsearchSink +from metadata.utils.importer import get_sink from metadata.utils.logger import data_insight_logger from metadata.utils.time_utils import ( get_beginning_of_day_timestamp_mill, @@ -101,14 +101,14 @@ class DataInsightWorkflow(WorkflowStatusMixin): sink_type="metadata-rest", sink_config=Sink(type="metadata-rest", config={}), # type: ignore metadata_config=self.metadata_config, - _from="data_insight", + from_="data_insight", ) self.es_sink = get_sink( sink_type=self.config.sink.type, sink_config=self.config.sink, metadata_config=self.metadata_config, - _from="ingestion", + from_="ingestion", ) self.es_sink = cast(ElasticsearchSink, self.es_sink) diff --git a/ingestion/src/metadata/ingestion/api/workflow.py b/ingestion/src/metadata/ingestion/api/workflow.py index 92dff907025..a2d9d96f96f 100644 --- a/ingestion/src/metadata/ingestion/api/workflow.py +++ b/ingestion/src/metadata/ingestion/api/workflow.py @@ -11,12 +11,8 @@ """ Workflow definition for metadata related ingestions: metadata, lineage and usage. """ -import importlib import traceback - -# module building strings read better with .format instead of f-strings -# pylint: disable=consider-using-f-string -from typing import Optional, Type, TypeVar, cast +from typing import Optional, TypeVar, cast from metadata.config.common import WorkflowExecutionError from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( @@ -43,6 +39,14 @@ from metadata.utils.class_helper import ( get_service_class_from_service_type, get_service_type_from_source_type, ) +from metadata.utils.importer import ( + import_bulk_sink_type, + import_class, + import_processor_class, + import_sink_class, + import_source_class, + import_stage_class, +) from metadata.utils.logger import ingestion_logger, set_loggers_level from metadata.utils.workflow_output_handler import print_status from metadata.workflow.workflow_status_mixin import WorkflowStatusMixin @@ -88,8 +92,10 @@ class Workflow(WorkflowStatusMixin): set_loggers_level(config.workflowConfig.loggerLevel.value) + # Source that we are ingesting, e.g., mysql, looker or kafka source_type = self.config.source.type.lower() + # Type of the source: Database, Dashboard, Messaging, Pipeline, Metadata or Mlmodel service_type: ServiceType = get_service_type_from_source_type( self.config.source.type ) @@ -106,14 +112,12 @@ class Workflow(WorkflowStatusMixin): logger.info(f"Service type:{service_type},{source_type} configured") - source_class = self.get( - self.config.source.serviceConnection.__root__.config.sourcePythonClass - if source_type.startswith("custom") - else "metadata.ingestion.source.{}.{}.{}Source".format( - service_type.name.lower(), - self.type_class_fetch(source_type, True), - self.type_class_fetch(source_type, False), + source_class = ( + import_class( + self.config.source.serviceConnection.__root__.config.sourcePythonClass ) + if source_type.startswith("custom") + else import_source_class(service_type=service_type, source_type=source_type) ) self.source: Source = source_class.create( @@ -125,12 +129,7 @@ class Workflow(WorkflowStatusMixin): if self.config.processor: processor_type = self.config.processor.type - processor_class = self.get( - "metadata.ingestion.processor.{}.{}Processor".format( - self.type_class_fetch(processor_type, True), - self.type_class_fetch(processor_type, False), - ) - ) + processor_class = import_processor_class(processor_type=processor_type) processor_config = self.config.processor.dict().get("config", {}) self.processor: Processor = processor_class.create( processor_config, metadata_config @@ -141,36 +140,21 @@ class Workflow(WorkflowStatusMixin): if self.config.stage: stage_type = self.config.stage.type - stage_class = self.get( - "metadata.ingestion.stage.{}.{}Stage".format( - self.type_class_fetch(stage_type, True), - self.type_class_fetch(stage_type, False), - ) - ) + stage_class = import_stage_class(stage_type=stage_type) stage_config = self.config.stage.dict().get("config", {}) self.stage: Stage = stage_class.create(stage_config, metadata_config) logger.debug(f"Stage Type: {stage_type}, {stage_class} configured") if self.config.sink: sink_type = self.config.sink.type - sink_class = self.get( - "metadata.ingestion.sink.{}.{}Sink".format( - self.type_class_fetch(sink_type, True), - self.type_class_fetch(sink_type, False), - ) - ) + sink_class = import_sink_class(sink_type=sink_type) sink_config = self.config.sink.dict().get("config", {}) self.sink: Sink = sink_class.create(sink_config, metadata_config) logger.debug(f"Sink type:{self.config.sink.type},{sink_class} configured") if self.config.bulkSink: bulk_sink_type = self.config.bulkSink.type - bulk_sink_class = self.get( - "metadata.ingestion.bulksink.{}.{}BulkSink".format( - self.type_class_fetch(bulk_sink_type, True), - self.type_class_fetch(bulk_sink_type, False), - ) - ) + bulk_sink_class = import_bulk_sink_type(bulk_sink_type=bulk_sink_type) bulk_sink_config = self.config.bulkSink.dict().get("config", {}) self.bulk_sink: BulkSink = bulk_sink_class.create( bulk_sink_config, metadata_config @@ -189,22 +173,6 @@ class Workflow(WorkflowStatusMixin): return self._timer - def type_class_fetch(self, type_: str, is_file: bool): - if is_file: - return type_.replace("-", "_") - - return "".join([i.title() for i in type_.replace("-", "_").split("_")]) - - def get(self, key: str) -> Optional[Type[T]]: - if key.find(".") >= 0: - # If the key contains a dot, we treat it as an import path and attempt - # to load it dynamically. - module_name, class_name = key.rsplit(".", 1) - my_class = getattr(importlib.import_module(module_name), class_name) - return my_class - - return None - @classmethod def create(cls, config_dict: dict) -> "Workflow": config = parse_workflow_config_gracefully(config_dict) diff --git a/ingestion/src/metadata/orm_profiler/api/workflow.py b/ingestion/src/metadata/orm_profiler/api/workflow.py index 9ea2c808d4a..7f7530e096e 100644 --- a/ingestion/src/metadata/orm_profiler/api/workflow.py +++ b/ingestion/src/metadata/orm_profiler/api/workflow.py @@ -24,7 +24,6 @@ from pydantic import ValidationError from sqlalchemy import MetaData from metadata.config.common import WorkflowExecutionError -from metadata.config.workflow import get_sink from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.table import ( ColumnProfilerConfig, @@ -82,6 +81,7 @@ from metadata.utils.class_helper import ( ) from metadata.utils.connections import get_connection, test_connection from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table +from metadata.utils.importer import get_sink from metadata.utils.logger import profiler_logger from metadata.utils.partition import get_partition_details from metadata.utils.workflow_output_handler import print_profiler_status @@ -128,7 +128,7 @@ class ProfilerWorkflow(WorkflowStatusMixin): sink_type=self.config.sink.type, sink_config=self.config.sink, metadata_config=self.metadata_config, - _from="orm_profiler", + from_="orm_profiler", ) if not self._validate_service_name(): diff --git a/ingestion/src/metadata/test_suite/api/workflow.py b/ingestion/src/metadata/test_suite/api/workflow.py index c466265819a..b317fa2f5b2 100644 --- a/ingestion/src/metadata/test_suite/api/workflow.py +++ b/ingestion/src/metadata/test_suite/api/workflow.py @@ -25,7 +25,6 @@ from pydantic import ValidationError from sqlalchemy import MetaData from metadata.config.common import WorkflowExecutionError -from metadata.config.workflow import get_sink from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest from metadata.generated.schema.entity.data.table import PartitionProfilerConfig, Table @@ -61,6 +60,7 @@ from metadata.test_suite.api.models import TestCaseDefinition, TestSuiteProcesso from metadata.test_suite.runner.core import DataTestsRunner from metadata.utils import entity_link from metadata.utils.connections import get_connection +from metadata.utils.importer import get_sink from metadata.utils.logger import test_suite_logger from metadata.utils.partition import get_partition_details from metadata.utils.workflow_output_handler import print_test_suite_status @@ -107,7 +107,7 @@ class TestSuiteWorkflow(WorkflowStatusMixin): sink_type=self.config.sink.type, sink_config=self.config.sink, metadata_config=self.metadata_config, - _from="test_suite", + from_="test_suite", ) @classmethod diff --git a/ingestion/src/metadata/utils/importer.py b/ingestion/src/metadata/utils/importer.py new file mode 100644 index 00000000000..97ef3eaeeb2 --- /dev/null +++ b/ingestion/src/metadata/utils/importer.py @@ -0,0 +1,150 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Helpers to import python classes and modules dynamically +""" +import importlib +import traceback +from typing import Type, TypeVar + +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.generated.schema.entity.services.serviceType import ServiceType +from metadata.generated.schema.metadataIngestion.workflow import Sink as WorkflowSink +from metadata.ingestion.api.bulk_sink import BulkSink +from metadata.ingestion.api.processor import Processor +from metadata.ingestion.api.sink import Sink +from metadata.ingestion.api.source import Source +from metadata.ingestion.api.stage import Stage +from metadata.utils.logger import utils_logger + +logger = utils_logger() + +T = TypeVar("T") +TYPE_SEPARATOR = "-" +CLASS_SEPARATOR = "_" +MODULE_SEPARATOR = "." + + +class ImportClassException(Exception): + """ + Raise it when having issues dynamically importing classes + """ + + +def get_module_name(type_: str) -> str: + """ + Build the module name in the ingestion package + from a source type, e.g., mysql or clickhouse-lineage + -> clickhouse_lineage + """ + return type_.replace(TYPE_SEPARATOR, CLASS_SEPARATOR) + + +def get_class_name(type_: str) -> str: + """ + Build the class name in the ingestion package + from a source type, e.g., mysql or clickhouse-lineage + -> ClickhouseLineage + """ + return "".join([i.title() for i in type_.split(TYPE_SEPARATOR)]) + + +def import_class(key: str) -> Type[T]: + """ + Dynamically import a class from a module path + """ + + try: + module_name, class_name = key.rsplit(MODULE_SEPARATOR, 1) + my_class = getattr(importlib.import_module(module_name), class_name) + return my_class + except Exception as err: + logger.debug(traceback.format_exc()) + raise ImportClassException(f"Cannot load class from {key} due to {err}") + + +# module building strings read better with .format instead of f-strings +# pylint: disable=consider-using-f-string +def import_source_class( + service_type: ServiceType, source_type: str, from_: str = "ingestion" +) -> Type[Source]: + return import_class( + "metadata.{}.source.{}.{}.{}Source".format( + from_, + service_type.name.lower(), + get_module_name(source_type), + get_class_name(source_type), + ) + ) + + +def import_processor_class( + processor_type: str, from_: str = "ingestion" +) -> Type[Processor]: + return import_class( + "metadata.{}.processor.{}.{}Processor".format( + from_, + get_module_name(processor_type), + get_class_name(processor_type), + ) + ) + + +def import_stage_class(stage_type: str, from_: str = "ingestion") -> Type[Stage]: + return import_class( + "metadata.{}.stage.{}.{}Stage".format( + from_, + get_module_name(stage_type), + get_class_name(stage_type), + ) + ) + + +def import_sink_class(sink_type: str, from_: str = "ingestion") -> Type[Sink]: + return import_class( + "metadata.{}.sink.{}.{}Sink".format( + from_, + get_module_name(sink_type), + get_class_name(sink_type), + ) + ) + + +def import_bulk_sink_type( + bulk_sink_type: str, from_: str = "ingestion" +) -> Type[BulkSink]: + return import_class( + "metadata.{}.bulksink.{}.{}BulkSink".format( + from_, + get_module_name(bulk_sink_type), + get_class_name(bulk_sink_type), + ) + ) + + +def get_sink( + sink_type: str, + sink_config: WorkflowSink, + metadata_config: OpenMetadataConnection, + from_: str = "ingestion", +) -> Sink: + """ + Import the sink class and create it + from the given configs + """ + sink_class = import_sink_class(sink_type=sink_type, from_=from_) + sink_config = sink_config.dict().get("config", {}) + sink: Sink = sink_class.create(sink_config, metadata_config) + logger.debug(f"Sink type:{sink_type}, {sink_class} configured") + + return sink diff --git a/ingestion/tests/unit/test_importer.py b/ingestion/tests/unit/test_importer.py new file mode 100644 index 00000000000..ae4988cf0f2 --- /dev/null +++ b/ingestion/tests/unit/test_importer.py @@ -0,0 +1,106 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Test import utilities +""" +from unittest import TestCase + +from metadata.generated.schema.entity.services.serviceType import ServiceType +from metadata.utils.importer import ( + get_class_name, + get_module_name, + import_bulk_sink_type, + import_class, + import_processor_class, + import_sink_class, + import_source_class, + import_stage_class, +) + + +# pylint: disable=import-outside-toplevel +class ImporterTest(TestCase): + """ + Validate that we properly convert + module paths and load classes. + """ + + def test_get_module_name(self) -> None: + self.assertEqual(get_module_name("mysql"), "mysql") + self.assertEqual(get_module_name("redshift-usage"), "redshift_usage") + + def test_get_class_name(self) -> None: + self.assertEqual(get_class_name("mysql"), "Mysql") + self.assertEqual(get_class_name("redshift-usage"), "RedshiftUsage") + + def test_import_class(self) -> None: + from metadata.ingestion.source.database.mysql import ( + MysqlSource, # pylint: disable=import-outside-toplevel + ) + + self.assertEqual( + import_class("metadata.ingestion.source.database.mysql.MysqlSource"), + MysqlSource, + ) + + def test_import_source_class(self) -> None: + from metadata.ingestion.source.database.mysql import MysqlSource + from metadata.ingestion.source.database.snowflake import SnowflakeSource + + self.assertEqual( + import_source_class(service_type=ServiceType.Database, source_type="mysql"), + MysqlSource, + ) + + self.assertEqual( + import_source_class( + service_type=ServiceType.Database, source_type="snowflake" + ), + SnowflakeSource, + ) + + def test_import_processor_class(self) -> None: + from metadata.ingestion.processor.query_parser import QueryParserProcessor + + self.assertEqual( + import_processor_class(processor_type="query-parser"), + QueryParserProcessor, + ) + + def test_import_stage_class(self) -> None: + + from metadata.ingestion.stage.table_usage import TableUsageStage + + self.assertEqual(import_stage_class(stage_type="table-usage"), TableUsageStage) + + def test_import_sink_class(self) -> None: + + from metadata.ingestion.sink.metadata_rest import MetadataRestSink + + self.assertEqual(import_sink_class(sink_type="metadata-rest"), MetadataRestSink) + + def test_import_bulk_sink_type(self) -> None: + + from metadata.ingestion.bulksink.metadata_usage import MetadataUsageBulkSink + + self.assertEqual( + import_bulk_sink_type(bulk_sink_type="metadata-usage"), + MetadataUsageBulkSink, + ) + + def test_import_sink_from(self) -> None: + from metadata.orm_profiler.sink.metadata_rest import MetadataRestSink + + self.assertEqual( + import_sink_class(sink_type="metadata-rest", from_="orm_profiler"), + MetadataRestSink, + )