2024-10-24 07:47:50 +02:00
|
|
|
"""
|
|
|
|
Manifests are used to store class information
|
|
|
|
"""
|
|
|
|
|
|
|
|
from typing import Optional, Type, cast
|
|
|
|
|
|
|
|
from pydantic import model_validator
|
|
|
|
|
2024-11-19 08:10:45 +01:00
|
|
|
from metadata.data_quality.interface.test_suite_interface import TestSuiteInterface
|
2024-10-24 07:47:50 +02:00
|
|
|
from metadata.generated.schema.entity.services.serviceType import ServiceType
|
|
|
|
from metadata.ingestion.api.steps import Source
|
2025-06-13 14:52:29 +02:00
|
|
|
from metadata.ingestion.connections.connection import BaseConnection
|
2024-10-24 07:47:50 +02:00
|
|
|
from metadata.ingestion.models.custom_pydantic import BaseModel
|
2024-11-19 08:10:45 +01:00
|
|
|
from metadata.profiler.interface.profiler_interface import ProfilerInterface
|
|
|
|
from metadata.sampler.sampler_interface import SamplerInterface
|
2024-10-24 07:47:50 +02:00
|
|
|
from metadata.utils.importer import (
|
|
|
|
TYPE_SEPARATOR,
|
2024-11-20 17:31:48 +01:00
|
|
|
DynamicImportException,
|
2024-10-24 07:47:50 +02:00
|
|
|
get_class_path,
|
|
|
|
get_module_dir,
|
|
|
|
import_from_module,
|
|
|
|
)
|
|
|
|
from metadata.utils.logger import utils_logger
|
|
|
|
|
|
|
|
logger = utils_logger()
|
|
|
|
|
|
|
|
|
|
|
|
class BaseSpec(BaseModel):
|
|
|
|
"""
|
|
|
|
# The OpenMetadata Ingestion Service Specification (Spec)
|
|
|
|
|
|
|
|
This is the API for defining a service in OpenMetadata it needs to be in the classpath of the connector in
|
|
|
|
the form:
|
|
|
|
|
|
|
|
metadata.ingestion.source.{service_type}.{service_name}.service_spec.ServiceSpec
|
|
|
|
|
|
|
|
Example for postres:
|
|
|
|
|
|
|
|
metadata.ingestion.source.database.postgres.service_spec.ServiceSpec
|
|
|
|
|
|
|
|
You can supply either strings with the full classpath or concrete classes that will be converted to strings.
|
|
|
|
|
|
|
|
The use of strings for the values gives us a few advantages:
|
|
|
|
1. manifests can be defined using json/yaml and deserialized into this class.
|
|
|
|
2. We can dynamically import the class when needed and avoid dependency issues.
|
|
|
|
3. We avoid circular imports.
|
|
|
|
4. We can hot-swap the class implementation without changing the manifest (example: for testing).
|
|
|
|
"""
|
|
|
|
|
|
|
|
profiler_class: Optional[str] = None
|
2024-11-19 08:10:45 +01:00
|
|
|
test_suite_class: Optional[str] = None
|
2024-10-24 07:47:50 +02:00
|
|
|
metadata_source_class: str
|
|
|
|
lineage_source_class: Optional[str] = None
|
|
|
|
usage_source_class: Optional[str] = None
|
2024-11-19 08:10:45 +01:00
|
|
|
sampler_class: Optional[str] = None
|
2025-04-23 12:15:42 +02:00
|
|
|
data_diff: Optional[str] = None
|
2025-06-13 14:52:29 +02:00
|
|
|
connection_class: Optional[str] = None
|
2024-10-24 07:47:50 +02:00
|
|
|
|
|
|
|
@model_validator(mode="before")
|
|
|
|
@classmethod
|
|
|
|
def transform_fields(cls, values):
|
|
|
|
"""This allows us to pass in the class directly instead of the string representation of the class. The
|
|
|
|
validator will convert the class to a string representation of the class."""
|
|
|
|
for field in list(cls.model_fields.keys()):
|
|
|
|
if isinstance(values.get(field), type):
|
|
|
|
values[field] = get_class_path(values[field])
|
|
|
|
return values
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def get_for_source(
|
|
|
|
cls, service_type: ServiceType, source_type: str, from_: str = "ingestion"
|
|
|
|
) -> "BaseSpec":
|
|
|
|
"""Retrieves the manifest for a given source type. If it does not exist will attempt to retrieve
|
|
|
|
a default manifest for the service type.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
service_type (ServiceType): The service type.
|
|
|
|
source_type (str): The source type.
|
|
|
|
from_ (str, optional): The module to import from. Defaults to "ingestion".
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
BaseSpec: The manifest for the source type.
|
|
|
|
"""
|
|
|
|
return cls.model_validate(
|
|
|
|
import_from_module(
|
|
|
|
"metadata.{}.source.{}.{}.{}.ServiceSpec".format( # pylint: disable=C0209
|
|
|
|
from_,
|
|
|
|
service_type.name.lower(),
|
|
|
|
get_module_dir(source_type),
|
|
|
|
"service_spec",
|
|
|
|
)
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def import_source_class(
|
|
|
|
service_type: ServiceType, source_type: str, from_: str = "ingestion"
|
|
|
|
) -> Type[Source]:
|
|
|
|
source_class_type = source_type.split(TYPE_SEPARATOR)[-1]
|
|
|
|
if source_class_type in ["usage", "lineage"]:
|
|
|
|
field = f"{source_class_type}_source_class"
|
|
|
|
else:
|
|
|
|
field = "metadata_source_class"
|
|
|
|
spec = BaseSpec.get_for_source(service_type, source_type, from_)
|
|
|
|
return cast(
|
|
|
|
Type[Source],
|
|
|
|
import_from_module(spec.model_dump()[field]),
|
|
|
|
)
|
2024-11-19 08:10:45 +01:00
|
|
|
|
|
|
|
|
|
|
|
def import_profiler_class(
|
|
|
|
service_type: ServiceType, source_type: str
|
|
|
|
) -> Type[ProfilerInterface]:
|
|
|
|
class_path = BaseSpec.get_for_source(service_type, source_type).profiler_class
|
2025-06-13 14:52:29 +02:00
|
|
|
if not class_path:
|
|
|
|
raise ValueError(
|
|
|
|
f"Profiler class not found for service type {service_type} and source type {source_type}"
|
|
|
|
)
|
2024-11-19 08:10:45 +01:00
|
|
|
return cast(Type[ProfilerInterface], import_from_module(class_path))
|
|
|
|
|
|
|
|
|
|
|
|
def import_test_suite_class(
|
2024-11-20 17:31:48 +01:00
|
|
|
service_type: ServiceType,
|
|
|
|
source_type: str,
|
|
|
|
source_config_type: Optional[str] = None,
|
2024-11-19 08:10:45 +01:00
|
|
|
) -> Type[TestSuiteInterface]:
|
2024-11-20 17:31:48 +01:00
|
|
|
try:
|
|
|
|
class_path = BaseSpec.get_for_source(service_type, source_type).test_suite_class
|
|
|
|
except DynamicImportException:
|
|
|
|
if source_config_type:
|
|
|
|
class_path = BaseSpec.get_for_source(
|
|
|
|
service_type, source_config_type.lower()
|
|
|
|
).test_suite_class
|
|
|
|
else:
|
|
|
|
raise
|
2025-06-13 14:52:29 +02:00
|
|
|
if not class_path:
|
|
|
|
raise ValueError(
|
|
|
|
f"Test suite class not found for service type {service_type} and source type {source_type}"
|
|
|
|
)
|
2024-11-19 08:10:45 +01:00
|
|
|
return cast(Type[TestSuiteInterface], import_from_module(class_path))
|
|
|
|
|
|
|
|
|
|
|
|
def import_sampler_class(
|
2024-11-20 17:31:48 +01:00
|
|
|
service_type: ServiceType,
|
|
|
|
source_type: str,
|
|
|
|
source_config_type: Optional[str] = None,
|
2024-11-19 08:10:45 +01:00
|
|
|
) -> Type[SamplerInterface]:
|
2024-11-20 17:31:48 +01:00
|
|
|
try:
|
|
|
|
class_path = BaseSpec.get_for_source(service_type, source_type).sampler_class
|
|
|
|
except DynamicImportException:
|
|
|
|
if source_config_type:
|
|
|
|
class_path = BaseSpec.get_for_source(
|
|
|
|
service_type, source_config_type.lower()
|
|
|
|
).sampler_class
|
|
|
|
else:
|
|
|
|
raise
|
2025-06-13 14:52:29 +02:00
|
|
|
if not class_path:
|
|
|
|
raise ValueError(
|
|
|
|
f"Sampler class not found for service type {service_type} and source type {source_type}"
|
|
|
|
)
|
2024-11-19 08:10:45 +01:00
|
|
|
return cast(Type[SamplerInterface], import_from_module(class_path))
|
2025-06-13 14:52:29 +02:00
|
|
|
|
|
|
|
|
|
|
|
def import_connection_class(
|
|
|
|
service_type: ServiceType,
|
|
|
|
source_type: str,
|
|
|
|
) -> Type[BaseConnection]:
|
|
|
|
"""
|
|
|
|
Import the connection class for a given service type and source type.
|
|
|
|
"""
|
|
|
|
class_path = BaseSpec.get_for_source(service_type, source_type).connection_class
|
|
|
|
if not class_path:
|
|
|
|
raise ValueError(
|
|
|
|
f"Connection class not found for service type {service_type} and source type {source_type}"
|
|
|
|
)
|
|
|
|
return cast(Type[BaseConnection], import_from_module(class_path))
|