Organise dynamic importers (#9513)

Organise dynamic importers (#9513)
This commit is contained in:
Pere Miquel Brull 2022-12-23 12:17:06 +01:00 committed by GitHub
parent 82619bb05b
commit ec6ebb3694
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 284 additions and 179 deletions

View File

@ -34,7 +34,7 @@ default_args = {
config = """ config = """
source: source:
type: sample_data type: sample-data
serviceName: sample_data_ingestion serviceName: sample_data_ingestion
serviceConnection: serviceConnection:
config: config:

View File

@ -1,119 +0,0 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Workflow related configurations and utilities
"""
import importlib
import logging
from typing import Optional, Type, TypeVar
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Processor as WorkflowProcessor,
)
from metadata.generated.schema.metadataIngestion.workflow import Sink as WorkflowSink
from metadata.ingestion.api.processor import Processor
from metadata.ingestion.api.sink import Sink
logger = logging.getLogger("Config")
T = TypeVar("T")
def fetch_type_class(_type: str, is_file: bool):
"""
Helper function to build the path for
dynamic imports
"""
if is_file:
return _type.replace("-", "_")
return "".join([i.title() for i in _type.replace("-", "_").split("_")])
def get_class(key: str) -> Optional[Type[T]]:
"""
Given an import key, import the class and return it
"""
if key.find(".") >= 0:
# If the key contains a dot, we treat it as an import path and attempt
# to load it dynamically.
module_name, class_name = key.rsplit(".", 1)
my_class = getattr(importlib.import_module(module_name), class_name)
return my_class
return None
def get_sink(
sink_type: str,
sink_config: WorkflowSink,
metadata_config: OpenMetadataConnection,
_from: str = "ingestion",
) -> Sink:
"""
Helps us to fetch and importing the sink class.
By default, we will pick it up from `ingestion`.
:param sink_type: Type specified in the config, e.g., metadata-rest
:param sink_config: Specific sink configurations, such as the host
:param metadata_config: Metadata server configurations
:param _from: From where do we load the sink class. Ingestion by default.
"""
sink_class = get_class(
f"metadata.{_from}.sink.{fetch_type_class(sink_type, is_file=True)}."
f"{fetch_type_class(sink_type, is_file=False)}Sink"
)
sink: Sink = sink_class.create(
sink_config.dict().get("config", {}), metadata_config
)
logger.debug(f"Sink type: {sink_type}, {sink_class} configured")
return sink
def get_processor(
processor_type: str,
processor_config: WorkflowProcessor,
metadata_config: OpenMetadataConnection,
_from: str = "ingestion",
**kwargs,
) -> Processor:
"""
Helps us to fetch and import the Processor class.
By default, we will pick it up from `ingestion`
We allow to pass any other specific object we may require.
E.g., for the ORM Profiler we need a Session to reach
the source tables.
:param processor_type: Type specified in the config, e.g., metadata-rest
:param processor_config: Specific Processor configurations, such as the profiler and tests
:param metadata_config: Metadata server configurations
:param _from: From where do we load the sink class. Ingestion by default.
"""
processor_class = get_class(
f"metadata.{_from}.processor.{fetch_type_class(processor_type, is_file=True)}."
f"{fetch_type_class(processor_type, is_file=False)}Processor"
)
processor: Processor = processor_class.create(
processor_config.dict().get("config", {}), metadata_config, **kwargs
)
logger.debug(f"Sink type: {processor_type}, {processor_class} configured")
return processor

View File

@ -27,7 +27,6 @@ from typing import Optional, Union, cast
from pydantic import ValidationError from pydantic import ValidationError
from metadata.config.common import WorkflowExecutionError from metadata.config.common import WorkflowExecutionError
from metadata.config.workflow import get_sink
from metadata.data_insight.helper.data_insight_es_index import DataInsightEsIndex from metadata.data_insight.helper.data_insight_es_index import DataInsightEsIndex
from metadata.data_insight.processor.data_processor import DataProcessor from metadata.data_insight.processor.data_processor import DataProcessor
from metadata.data_insight.processor.entity_report_data_processor import ( from metadata.data_insight.processor.entity_report_data_processor import (
@ -55,6 +54,7 @@ from metadata.ingestion.api.parser import parse_workflow_config_gracefully
from metadata.ingestion.api.processor import ProcessorStatus from metadata.ingestion.api.processor import ProcessorStatus
from metadata.ingestion.ometa.ometa_api import EntityList, OpenMetadata from metadata.ingestion.ometa.ometa_api import EntityList, OpenMetadata
from metadata.ingestion.sink.elasticsearch import ElasticsearchSink from metadata.ingestion.sink.elasticsearch import ElasticsearchSink
from metadata.utils.importer import get_sink
from metadata.utils.logger import data_insight_logger from metadata.utils.logger import data_insight_logger
from metadata.utils.time_utils import ( from metadata.utils.time_utils import (
get_beginning_of_day_timestamp_mill, get_beginning_of_day_timestamp_mill,
@ -101,14 +101,14 @@ class DataInsightWorkflow(WorkflowStatusMixin):
sink_type="metadata-rest", sink_type="metadata-rest",
sink_config=Sink(type="metadata-rest", config={}), # type: ignore sink_config=Sink(type="metadata-rest", config={}), # type: ignore
metadata_config=self.metadata_config, metadata_config=self.metadata_config,
_from="data_insight", from_="data_insight",
) )
self.es_sink = get_sink( self.es_sink = get_sink(
sink_type=self.config.sink.type, sink_type=self.config.sink.type,
sink_config=self.config.sink, sink_config=self.config.sink,
metadata_config=self.metadata_config, metadata_config=self.metadata_config,
_from="ingestion", from_="ingestion",
) )
self.es_sink = cast(ElasticsearchSink, self.es_sink) self.es_sink = cast(ElasticsearchSink, self.es_sink)

View File

@ -11,12 +11,8 @@
""" """
Workflow definition for metadata related ingestions: metadata, lineage and usage. Workflow definition for metadata related ingestions: metadata, lineage and usage.
""" """
import importlib
import traceback import traceback
from typing import Optional, TypeVar, cast
# module building strings read better with .format instead of f-strings
# pylint: disable=consider-using-f-string
from typing import Optional, Type, TypeVar, cast
from metadata.config.common import WorkflowExecutionError from metadata.config.common import WorkflowExecutionError
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
@ -43,6 +39,14 @@ from metadata.utils.class_helper import (
get_service_class_from_service_type, get_service_class_from_service_type,
get_service_type_from_source_type, get_service_type_from_source_type,
) )
from metadata.utils.importer import (
import_bulk_sink_type,
import_class,
import_processor_class,
import_sink_class,
import_source_class,
import_stage_class,
)
from metadata.utils.logger import ingestion_logger, set_loggers_level from metadata.utils.logger import ingestion_logger, set_loggers_level
from metadata.utils.workflow_output_handler import print_status from metadata.utils.workflow_output_handler import print_status
from metadata.workflow.workflow_status_mixin import WorkflowStatusMixin from metadata.workflow.workflow_status_mixin import WorkflowStatusMixin
@ -88,8 +92,10 @@ class Workflow(WorkflowStatusMixin):
set_loggers_level(config.workflowConfig.loggerLevel.value) set_loggers_level(config.workflowConfig.loggerLevel.value)
# Source that we are ingesting, e.g., mysql, looker or kafka
source_type = self.config.source.type.lower() source_type = self.config.source.type.lower()
# Type of the source: Database, Dashboard, Messaging, Pipeline, Metadata or Mlmodel
service_type: ServiceType = get_service_type_from_source_type( service_type: ServiceType = get_service_type_from_source_type(
self.config.source.type self.config.source.type
) )
@ -106,14 +112,12 @@ class Workflow(WorkflowStatusMixin):
logger.info(f"Service type:{service_type},{source_type} configured") logger.info(f"Service type:{service_type},{source_type} configured")
source_class = self.get( source_class = (
self.config.source.serviceConnection.__root__.config.sourcePythonClass import_class(
if source_type.startswith("custom") self.config.source.serviceConnection.__root__.config.sourcePythonClass
else "metadata.ingestion.source.{}.{}.{}Source".format(
service_type.name.lower(),
self.type_class_fetch(source_type, True),
self.type_class_fetch(source_type, False),
) )
if source_type.startswith("custom")
else import_source_class(service_type=service_type, source_type=source_type)
) )
self.source: Source = source_class.create( self.source: Source = source_class.create(
@ -125,12 +129,7 @@ class Workflow(WorkflowStatusMixin):
if self.config.processor: if self.config.processor:
processor_type = self.config.processor.type processor_type = self.config.processor.type
processor_class = self.get( processor_class = import_processor_class(processor_type=processor_type)
"metadata.ingestion.processor.{}.{}Processor".format(
self.type_class_fetch(processor_type, True),
self.type_class_fetch(processor_type, False),
)
)
processor_config = self.config.processor.dict().get("config", {}) processor_config = self.config.processor.dict().get("config", {})
self.processor: Processor = processor_class.create( self.processor: Processor = processor_class.create(
processor_config, metadata_config processor_config, metadata_config
@ -141,36 +140,21 @@ class Workflow(WorkflowStatusMixin):
if self.config.stage: if self.config.stage:
stage_type = self.config.stage.type stage_type = self.config.stage.type
stage_class = self.get( stage_class = import_stage_class(stage_type=stage_type)
"metadata.ingestion.stage.{}.{}Stage".format(
self.type_class_fetch(stage_type, True),
self.type_class_fetch(stage_type, False),
)
)
stage_config = self.config.stage.dict().get("config", {}) stage_config = self.config.stage.dict().get("config", {})
self.stage: Stage = stage_class.create(stage_config, metadata_config) self.stage: Stage = stage_class.create(stage_config, metadata_config)
logger.debug(f"Stage Type: {stage_type}, {stage_class} configured") logger.debug(f"Stage Type: {stage_type}, {stage_class} configured")
if self.config.sink: if self.config.sink:
sink_type = self.config.sink.type sink_type = self.config.sink.type
sink_class = self.get( sink_class = import_sink_class(sink_type=sink_type)
"metadata.ingestion.sink.{}.{}Sink".format(
self.type_class_fetch(sink_type, True),
self.type_class_fetch(sink_type, False),
)
)
sink_config = self.config.sink.dict().get("config", {}) sink_config = self.config.sink.dict().get("config", {})
self.sink: Sink = sink_class.create(sink_config, metadata_config) self.sink: Sink = sink_class.create(sink_config, metadata_config)
logger.debug(f"Sink type:{self.config.sink.type},{sink_class} configured") logger.debug(f"Sink type:{self.config.sink.type},{sink_class} configured")
if self.config.bulkSink: if self.config.bulkSink:
bulk_sink_type = self.config.bulkSink.type bulk_sink_type = self.config.bulkSink.type
bulk_sink_class = self.get( bulk_sink_class = import_bulk_sink_type(bulk_sink_type=bulk_sink_type)
"metadata.ingestion.bulksink.{}.{}BulkSink".format(
self.type_class_fetch(bulk_sink_type, True),
self.type_class_fetch(bulk_sink_type, False),
)
)
bulk_sink_config = self.config.bulkSink.dict().get("config", {}) bulk_sink_config = self.config.bulkSink.dict().get("config", {})
self.bulk_sink: BulkSink = bulk_sink_class.create( self.bulk_sink: BulkSink = bulk_sink_class.create(
bulk_sink_config, metadata_config bulk_sink_config, metadata_config
@ -189,22 +173,6 @@ class Workflow(WorkflowStatusMixin):
return self._timer return self._timer
def type_class_fetch(self, type_: str, is_file: bool):
if is_file:
return type_.replace("-", "_")
return "".join([i.title() for i in type_.replace("-", "_").split("_")])
def get(self, key: str) -> Optional[Type[T]]:
if key.find(".") >= 0:
# If the key contains a dot, we treat it as an import path and attempt
# to load it dynamically.
module_name, class_name = key.rsplit(".", 1)
my_class = getattr(importlib.import_module(module_name), class_name)
return my_class
return None
@classmethod @classmethod
def create(cls, config_dict: dict) -> "Workflow": def create(cls, config_dict: dict) -> "Workflow":
config = parse_workflow_config_gracefully(config_dict) config = parse_workflow_config_gracefully(config_dict)

View File

@ -24,7 +24,6 @@ from pydantic import ValidationError
from sqlalchemy import MetaData from sqlalchemy import MetaData
from metadata.config.common import WorkflowExecutionError from metadata.config.common import WorkflowExecutionError
from metadata.config.workflow import get_sink
from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import ( from metadata.generated.schema.entity.data.table import (
ColumnProfilerConfig, ColumnProfilerConfig,
@ -82,6 +81,7 @@ from metadata.utils.class_helper import (
) )
from metadata.utils.connections import get_connection, test_connection from metadata.utils.connections import get_connection, test_connection
from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table
from metadata.utils.importer import get_sink
from metadata.utils.logger import profiler_logger from metadata.utils.logger import profiler_logger
from metadata.utils.partition import get_partition_details from metadata.utils.partition import get_partition_details
from metadata.utils.workflow_output_handler import print_profiler_status from metadata.utils.workflow_output_handler import print_profiler_status
@ -128,7 +128,7 @@ class ProfilerWorkflow(WorkflowStatusMixin):
sink_type=self.config.sink.type, sink_type=self.config.sink.type,
sink_config=self.config.sink, sink_config=self.config.sink,
metadata_config=self.metadata_config, metadata_config=self.metadata_config,
_from="orm_profiler", from_="orm_profiler",
) )
if not self._validate_service_name(): if not self._validate_service_name():

View File

@ -25,7 +25,6 @@ from pydantic import ValidationError
from sqlalchemy import MetaData from sqlalchemy import MetaData
from metadata.config.common import WorkflowExecutionError from metadata.config.common import WorkflowExecutionError
from metadata.config.workflow import get_sink
from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest
from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest
from metadata.generated.schema.entity.data.table import PartitionProfilerConfig, Table from metadata.generated.schema.entity.data.table import PartitionProfilerConfig, Table
@ -61,6 +60,7 @@ from metadata.test_suite.api.models import TestCaseDefinition, TestSuiteProcesso
from metadata.test_suite.runner.core import DataTestsRunner from metadata.test_suite.runner.core import DataTestsRunner
from metadata.utils import entity_link from metadata.utils import entity_link
from metadata.utils.connections import get_connection from metadata.utils.connections import get_connection
from metadata.utils.importer import get_sink
from metadata.utils.logger import test_suite_logger from metadata.utils.logger import test_suite_logger
from metadata.utils.partition import get_partition_details from metadata.utils.partition import get_partition_details
from metadata.utils.workflow_output_handler import print_test_suite_status from metadata.utils.workflow_output_handler import print_test_suite_status
@ -107,7 +107,7 @@ class TestSuiteWorkflow(WorkflowStatusMixin):
sink_type=self.config.sink.type, sink_type=self.config.sink.type,
sink_config=self.config.sink, sink_config=self.config.sink,
metadata_config=self.metadata_config, metadata_config=self.metadata_config,
_from="test_suite", from_="test_suite",
) )
@classmethod @classmethod

View File

@ -0,0 +1,150 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Helpers to import python classes and modules dynamically
"""
import importlib
import traceback
from typing import Type, TypeVar
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.serviceType import ServiceType
from metadata.generated.schema.metadataIngestion.workflow import Sink as WorkflowSink
from metadata.ingestion.api.bulk_sink import BulkSink
from metadata.ingestion.api.processor import Processor
from metadata.ingestion.api.sink import Sink
from metadata.ingestion.api.source import Source
from metadata.ingestion.api.stage import Stage
from metadata.utils.logger import utils_logger
logger = utils_logger()
T = TypeVar("T")
TYPE_SEPARATOR = "-"
CLASS_SEPARATOR = "_"
MODULE_SEPARATOR = "."
class ImportClassException(Exception):
"""
Raise it when having issues dynamically importing classes
"""
def get_module_name(type_: str) -> str:
"""
Build the module name in the ingestion package
from a source type, e.g., mysql or clickhouse-lineage
-> clickhouse_lineage
"""
return type_.replace(TYPE_SEPARATOR, CLASS_SEPARATOR)
def get_class_name(type_: str) -> str:
"""
Build the class name in the ingestion package
from a source type, e.g., mysql or clickhouse-lineage
-> ClickhouseLineage
"""
return "".join([i.title() for i in type_.split(TYPE_SEPARATOR)])
def import_class(key: str) -> Type[T]:
"""
Dynamically import a class from a module path
"""
try:
module_name, class_name = key.rsplit(MODULE_SEPARATOR, 1)
my_class = getattr(importlib.import_module(module_name), class_name)
return my_class
except Exception as err:
logger.debug(traceback.format_exc())
raise ImportClassException(f"Cannot load class from {key} due to {err}")
# module building strings read better with .format instead of f-strings
# pylint: disable=consider-using-f-string
def import_source_class(
service_type: ServiceType, source_type: str, from_: str = "ingestion"
) -> Type[Source]:
return import_class(
"metadata.{}.source.{}.{}.{}Source".format(
from_,
service_type.name.lower(),
get_module_name(source_type),
get_class_name(source_type),
)
)
def import_processor_class(
processor_type: str, from_: str = "ingestion"
) -> Type[Processor]:
return import_class(
"metadata.{}.processor.{}.{}Processor".format(
from_,
get_module_name(processor_type),
get_class_name(processor_type),
)
)
def import_stage_class(stage_type: str, from_: str = "ingestion") -> Type[Stage]:
return import_class(
"metadata.{}.stage.{}.{}Stage".format(
from_,
get_module_name(stage_type),
get_class_name(stage_type),
)
)
def import_sink_class(sink_type: str, from_: str = "ingestion") -> Type[Sink]:
return import_class(
"metadata.{}.sink.{}.{}Sink".format(
from_,
get_module_name(sink_type),
get_class_name(sink_type),
)
)
def import_bulk_sink_type(
bulk_sink_type: str, from_: str = "ingestion"
) -> Type[BulkSink]:
return import_class(
"metadata.{}.bulksink.{}.{}BulkSink".format(
from_,
get_module_name(bulk_sink_type),
get_class_name(bulk_sink_type),
)
)
def get_sink(
sink_type: str,
sink_config: WorkflowSink,
metadata_config: OpenMetadataConnection,
from_: str = "ingestion",
) -> Sink:
"""
Import the sink class and create it
from the given configs
"""
sink_class = import_sink_class(sink_type=sink_type, from_=from_)
sink_config = sink_config.dict().get("config", {})
sink: Sink = sink_class.create(sink_config, metadata_config)
logger.debug(f"Sink type:{sink_type}, {sink_class} configured")
return sink

View File

@ -0,0 +1,106 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test import utilities
"""
from unittest import TestCase
from metadata.generated.schema.entity.services.serviceType import ServiceType
from metadata.utils.importer import (
get_class_name,
get_module_name,
import_bulk_sink_type,
import_class,
import_processor_class,
import_sink_class,
import_source_class,
import_stage_class,
)
# pylint: disable=import-outside-toplevel
class ImporterTest(TestCase):
"""
Validate that we properly convert
module paths and load classes.
"""
def test_get_module_name(self) -> None:
self.assertEqual(get_module_name("mysql"), "mysql")
self.assertEqual(get_module_name("redshift-usage"), "redshift_usage")
def test_get_class_name(self) -> None:
self.assertEqual(get_class_name("mysql"), "Mysql")
self.assertEqual(get_class_name("redshift-usage"), "RedshiftUsage")
def test_import_class(self) -> None:
from metadata.ingestion.source.database.mysql import (
MysqlSource, # pylint: disable=import-outside-toplevel
)
self.assertEqual(
import_class("metadata.ingestion.source.database.mysql.MysqlSource"),
MysqlSource,
)
def test_import_source_class(self) -> None:
from metadata.ingestion.source.database.mysql import MysqlSource
from metadata.ingestion.source.database.snowflake import SnowflakeSource
self.assertEqual(
import_source_class(service_type=ServiceType.Database, source_type="mysql"),
MysqlSource,
)
self.assertEqual(
import_source_class(
service_type=ServiceType.Database, source_type="snowflake"
),
SnowflakeSource,
)
def test_import_processor_class(self) -> None:
from metadata.ingestion.processor.query_parser import QueryParserProcessor
self.assertEqual(
import_processor_class(processor_type="query-parser"),
QueryParserProcessor,
)
def test_import_stage_class(self) -> None:
from metadata.ingestion.stage.table_usage import TableUsageStage
self.assertEqual(import_stage_class(stage_type="table-usage"), TableUsageStage)
def test_import_sink_class(self) -> None:
from metadata.ingestion.sink.metadata_rest import MetadataRestSink
self.assertEqual(import_sink_class(sink_type="metadata-rest"), MetadataRestSink)
def test_import_bulk_sink_type(self) -> None:
from metadata.ingestion.bulksink.metadata_usage import MetadataUsageBulkSink
self.assertEqual(
import_bulk_sink_type(bulk_sink_type="metadata-usage"),
MetadataUsageBulkSink,
)
def test_import_sink_from(self) -> None:
from metadata.orm_profiler.sink.metadata_rest import MetadataRestSink
self.assertEqual(
import_sink_class(sink_type="metadata-rest", from_="orm_profiler"),
MetadataRestSink,
)