Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

126 lines
4.5 KiB
Python
Raw Normal View History

[GEN-356] Use ServiceSpec for loading sources based on connectors (#18322) * ref(profiler): use di for system profile - use source classes that can be overridden in system profiles - use a manifest class instead of factory to specify which class to resolve for connectors - example usage can be seen in redshift and snowflake * - added manifests for all custom profilers - used super() dependency injection in order for system metrics source - formatting * - implement spec for all source types - added docs for the new specification - added some pylint ignores in the importer module * remove TYPE_CHECKING in core.py * - deleted valuedispatch function - deleted get_system_metrics_by_dialect - implemented BigQueryProfiler with a system metrics source - moved import_source_class to BaseSpec * - removed tests related to the profiler factory * - reverted start_time - removed DML_STAT_TO_DML_STATEMENT_MAPPING - removed unused logger * - reverted start_time - removed DML_STAT_TO_DML_STATEMENT_MAPPING - removed unused logger * fixed tests * format * bigquery system profile e2e tests * fixed module docstring * - removed import_side_effects from redshift. we still use it in postgres for the orm conversion maps. - removed leftover methods * - tests for BaseSpec - moved get_class_path to importer * - moved constructors around to get rid of useless kwargs * - changed test_system_metric * - added linage and usage to service_spec - fixed postgres native lineage test * add comments on collaborative constructors
2024-10-24 07:47:50 +02:00
"""
Manifests are used to store class information
"""
from typing import Optional, Type, cast
from pydantic import model_validator
from metadata.data_quality.interface.test_suite_interface import TestSuiteInterface
[GEN-356] Use ServiceSpec for loading sources based on connectors (#18322) * ref(profiler): use di for system profile - use source classes that can be overridden in system profiles - use a manifest class instead of factory to specify which class to resolve for connectors - example usage can be seen in redshift and snowflake * - added manifests for all custom profilers - used super() dependency injection in order for system metrics source - formatting * - implement spec for all source types - added docs for the new specification - added some pylint ignores in the importer module * remove TYPE_CHECKING in core.py * - deleted valuedispatch function - deleted get_system_metrics_by_dialect - implemented BigQueryProfiler with a system metrics source - moved import_source_class to BaseSpec * - removed tests related to the profiler factory * - reverted start_time - removed DML_STAT_TO_DML_STATEMENT_MAPPING - removed unused logger * - reverted start_time - removed DML_STAT_TO_DML_STATEMENT_MAPPING - removed unused logger * fixed tests * format * bigquery system profile e2e tests * fixed module docstring * - removed import_side_effects from redshift. we still use it in postgres for the orm conversion maps. - removed leftover methods * - tests for BaseSpec - moved get_class_path to importer * - moved constructors around to get rid of useless kwargs * - changed test_system_metric * - added linage and usage to service_spec - fixed postgres native lineage test * add comments on collaborative constructors
2024-10-24 07:47:50 +02:00
from metadata.generated.schema.entity.services.serviceType import ServiceType
from metadata.ingestion.api.steps import Source
from metadata.ingestion.models.custom_pydantic import BaseModel
from metadata.profiler.interface.profiler_interface import ProfilerInterface
from metadata.sampler.sampler_interface import SamplerInterface
[GEN-356] Use ServiceSpec for loading sources based on connectors (#18322) * ref(profiler): use di for system profile - use source classes that can be overridden in system profiles - use a manifest class instead of factory to specify which class to resolve for connectors - example usage can be seen in redshift and snowflake * - added manifests for all custom profilers - used super() dependency injection in order for system metrics source - formatting * - implement spec for all source types - added docs for the new specification - added some pylint ignores in the importer module * remove TYPE_CHECKING in core.py * - deleted valuedispatch function - deleted get_system_metrics_by_dialect - implemented BigQueryProfiler with a system metrics source - moved import_source_class to BaseSpec * - removed tests related to the profiler factory * - reverted start_time - removed DML_STAT_TO_DML_STATEMENT_MAPPING - removed unused logger * - reverted start_time - removed DML_STAT_TO_DML_STATEMENT_MAPPING - removed unused logger * fixed tests * format * bigquery system profile e2e tests * fixed module docstring * - removed import_side_effects from redshift. we still use it in postgres for the orm conversion maps. - removed leftover methods * - tests for BaseSpec - moved get_class_path to importer * - moved constructors around to get rid of useless kwargs * - changed test_system_metric * - added linage and usage to service_spec - fixed postgres native lineage test * add comments on collaborative constructors
2024-10-24 07:47:50 +02:00
from metadata.utils.importer import (
TYPE_SEPARATOR,
get_class_path,
get_module_dir,
import_from_module,
)
from metadata.utils.logger import utils_logger
logger = utils_logger()
class BaseSpec(BaseModel):
"""
# The OpenMetadata Ingestion Service Specification (Spec)
This is the API for defining a service in OpenMetadata it needs to be in the classpath of the connector in
the form:
metadata.ingestion.source.{service_type}.{service_name}.service_spec.ServiceSpec
Example for postres:
metadata.ingestion.source.database.postgres.service_spec.ServiceSpec
You can supply either strings with the full classpath or concrete classes that will be converted to strings.
The use of strings for the values gives us a few advantages:
1. manifests can be defined using json/yaml and deserialized into this class.
2. We can dynamically import the class when needed and avoid dependency issues.
3. We avoid circular imports.
4. We can hot-swap the class implementation without changing the manifest (example: for testing).
"""
profiler_class: Optional[str] = None
test_suite_class: Optional[str] = None
[GEN-356] Use ServiceSpec for loading sources based on connectors (#18322) * ref(profiler): use di for system profile - use source classes that can be overridden in system profiles - use a manifest class instead of factory to specify which class to resolve for connectors - example usage can be seen in redshift and snowflake * - added manifests for all custom profilers - used super() dependency injection in order for system metrics source - formatting * - implement spec for all source types - added docs for the new specification - added some pylint ignores in the importer module * remove TYPE_CHECKING in core.py * - deleted valuedispatch function - deleted get_system_metrics_by_dialect - implemented BigQueryProfiler with a system metrics source - moved import_source_class to BaseSpec * - removed tests related to the profiler factory * - reverted start_time - removed DML_STAT_TO_DML_STATEMENT_MAPPING - removed unused logger * - reverted start_time - removed DML_STAT_TO_DML_STATEMENT_MAPPING - removed unused logger * fixed tests * format * bigquery system profile e2e tests * fixed module docstring * - removed import_side_effects from redshift. we still use it in postgres for the orm conversion maps. - removed leftover methods * - tests for BaseSpec - moved get_class_path to importer * - moved constructors around to get rid of useless kwargs * - changed test_system_metric * - added linage and usage to service_spec - fixed postgres native lineage test * add comments on collaborative constructors
2024-10-24 07:47:50 +02:00
metadata_source_class: str
lineage_source_class: Optional[str] = None
usage_source_class: Optional[str] = None
sampler_class: Optional[str] = None
[GEN-356] Use ServiceSpec for loading sources based on connectors (#18322) * ref(profiler): use di for system profile - use source classes that can be overridden in system profiles - use a manifest class instead of factory to specify which class to resolve for connectors - example usage can be seen in redshift and snowflake * - added manifests for all custom profilers - used super() dependency injection in order for system metrics source - formatting * - implement spec for all source types - added docs for the new specification - added some pylint ignores in the importer module * remove TYPE_CHECKING in core.py * - deleted valuedispatch function - deleted get_system_metrics_by_dialect - implemented BigQueryProfiler with a system metrics source - moved import_source_class to BaseSpec * - removed tests related to the profiler factory * - reverted start_time - removed DML_STAT_TO_DML_STATEMENT_MAPPING - removed unused logger * - reverted start_time - removed DML_STAT_TO_DML_STATEMENT_MAPPING - removed unused logger * fixed tests * format * bigquery system profile e2e tests * fixed module docstring * - removed import_side_effects from redshift. we still use it in postgres for the orm conversion maps. - removed leftover methods * - tests for BaseSpec - moved get_class_path to importer * - moved constructors around to get rid of useless kwargs * - changed test_system_metric * - added linage and usage to service_spec - fixed postgres native lineage test * add comments on collaborative constructors
2024-10-24 07:47:50 +02:00
@model_validator(mode="before")
@classmethod
def transform_fields(cls, values):
"""This allows us to pass in the class directly instead of the string representation of the class. The
validator will convert the class to a string representation of the class."""
for field in list(cls.model_fields.keys()):
if isinstance(values.get(field), type):
values[field] = get_class_path(values[field])
return values
@classmethod
def get_for_source(
cls, service_type: ServiceType, source_type: str, from_: str = "ingestion"
) -> "BaseSpec":
"""Retrieves the manifest for a given source type. If it does not exist will attempt to retrieve
a default manifest for the service type.
Args:
service_type (ServiceType): The service type.
source_type (str): The source type.
from_ (str, optional): The module to import from. Defaults to "ingestion".
Returns:
BaseSpec: The manifest for the source type.
"""
return cls.model_validate(
import_from_module(
"metadata.{}.source.{}.{}.{}.ServiceSpec".format( # pylint: disable=C0209
from_,
service_type.name.lower(),
get_module_dir(source_type),
"service_spec",
)
)
)
def import_source_class(
service_type: ServiceType, source_type: str, from_: str = "ingestion"
) -> Type[Source]:
source_class_type = source_type.split(TYPE_SEPARATOR)[-1]
if source_class_type in ["usage", "lineage"]:
field = f"{source_class_type}_source_class"
else:
field = "metadata_source_class"
spec = BaseSpec.get_for_source(service_type, source_type, from_)
return cast(
Type[Source],
import_from_module(spec.model_dump()[field]),
)
def import_profiler_class(
service_type: ServiceType, source_type: str
) -> Type[ProfilerInterface]:
class_path = BaseSpec.get_for_source(service_type, source_type).profiler_class
return cast(Type[ProfilerInterface], import_from_module(class_path))
def import_test_suite_class(
service_type: ServiceType, source_type: str
) -> Type[TestSuiteInterface]:
class_path = BaseSpec.get_for_source(service_type, source_type).test_suite_class
return cast(Type[TestSuiteInterface], import_from_module(class_path))
def import_sampler_class(
service_type: ServiceType, source_type: str
) -> Type[SamplerInterface]:
class_path = BaseSpec.get_for_source(service_type, source_type).sampler_class
return cast(Type[SamplerInterface], import_from_module(class_path))