diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/serviceType.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/serviceType.json new file mode 100644 index 00000000000..4a6b59b57c9 --- /dev/null +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/serviceType.json @@ -0,0 +1,17 @@ +{ + "$id": "https://open-metadata.org/schema/entity/services/serviceType.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Service Type", + "description": "This schema defines the service types entities which requires a connection.", + "type": "string", + "javaType": "org.openmetadata.catalog.entity.services.ServiceType", + "enum": [ + "Dashboard", + "Database", + "Messaging", + "Metadata", + "MlModel", + "Pipeline" + ], + "additionalProperties": false +} diff --git a/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/workflow.json b/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/workflow.json index 134167555bd..955f38c7b90 100644 --- a/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/workflow.json +++ b/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/workflow.json @@ -66,7 +66,7 @@ } }, "additionalProperties": false, - "required": ["type", "serviceName", "serviceConnection", "sourceConfig"] + "required": ["type", "serviceName", "sourceConfig"] }, "processor": { "description": "Configuration for Processor Component in the OpenMetadata Ingestion Framework.", diff --git a/ingestion/src/metadata/config/workflow.py b/ingestion/src/metadata/config/workflow.py index cfb152a2824..c9392b8d340 100644 --- a/ingestion/src/metadata/config/workflow.py +++ b/ingestion/src/metadata/config/workflow.py @@ -68,50 +68,6 @@ def get_class(key: str) -> Type[T]: return my_class -def get_source_dir(connection_type: type) -> str: - if connection_type == DatabaseConnection: - return "database" - if connection_type == MessagingConnection: - return "messaging" - if connection_type == MetadataConnection: - return "metadata" - if connection_type == DashboardConnection: - return "dashboard" - if connection_type == PipelineConnection: - return "pipeline" - if connection_type == MlModelConnection: - return "mlmodel" - - -def get_ingestion_source( - source_type: str, - source_config: WorkflowSource, - metadata_config: OpenMetadataConnection, -) -> Source: - """ - Import the required source class and configure it. - - :param source_type: Type specified in the config, e.g., redshift - :param source_config: Specific source configurations, such as the host - :param metadata_config: Metadata server configurations - """ - - source_class = get_class( - "metadata.ingestion.source.{}.{}.{}Source".format( - get_source_dir(type(source_config.serviceConnection.__root__)), - fetch_type_class(source_type, is_file=True), - fetch_type_class(source_type, is_file=False), - ) - ) - source: Source = source_class.create(source_config.dict(), metadata_config) - logger.debug(f"Source type:{source_type},{source_class} configured") - - source.prepare() - logger.debug(f"Source type:{source_type},{source_class} prepared") - - return source - - def get_sink( sink_type: str, sink_config: WorkflowSink, diff --git a/ingestion/src/metadata/ingestion/api/topology_runner.py b/ingestion/src/metadata/ingestion/api/topology_runner.py index 5268cfcd9e9..f4629acf149 100644 --- a/ingestion/src/metadata/ingestion/api/topology_runner.py +++ b/ingestion/src/metadata/ingestion/api/topology_runner.py @@ -159,22 +159,31 @@ class TopologyRunnerMixin(Generic[C]): if entity is not None: if stage.ack_sink: - tries = 3 entity = None entity_fqn = self.fqn_from_context( stage=stage, entity_request=entity_request ) - while not entity and tries > 0: - yield entity_request - # Improve validation logic + # we get entity from OM if we do not want to overwrite existing data in OM + if not stage.overwrite: entity = self.metadata.get_by_name( entity=stage.type_, fqn=entity_fqn, fields=["*"], # Get all the available data from the Entity ) - tries -= 1 + # if entity does not exist in OM, or we want to overwrite, we will yield the entity_request + if entity is None: + tries = 3 + while not entity and tries > 0: + yield entity_request + # Improve validation logic + entity = self.metadata.get_by_name( + entity=stage.type_, + fqn=entity_fqn, + fields=["*"], # Get all the available data from the Entity + ) + tries -= 1 else: yield entity diff --git a/ingestion/src/metadata/ingestion/api/workflow.py b/ingestion/src/metadata/ingestion/api/workflow.py index eec75be755b..6554fc2af84 100644 --- a/ingestion/src/metadata/ingestion/api/workflow.py +++ b/ingestion/src/metadata/ingestion/api/workflow.py @@ -15,10 +15,10 @@ from typing import Type, TypeVar import click from metadata.config.common import WorkflowExecutionError -from metadata.config.workflow import get_source_dir from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) +from metadata.generated.schema.entity.services.serviceType import ServiceType from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataWorkflowConfig, ) @@ -28,6 +28,11 @@ from metadata.ingestion.api.processor import Processor from metadata.ingestion.api.sink import Sink from metadata.ingestion.api.source import Source from metadata.ingestion.api.stage import Stage +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.utils.class_helper import ( + get_service_class_from_service_type, + get_service_type_from_source_type, +) from metadata.utils.logger import ingestion_logger, set_loggers_level logger = ingestion_logger() @@ -56,9 +61,14 @@ class Workflow: set_loggers_level(config.workflowConfig.loggerLevel.value) source_type = self.config.source.type.lower() + + service_type: ServiceType = get_service_type_from_source_type( + self.config.source.type + ) + source_class = self.get( "metadata.ingestion.source.{}.{}.{}Source".format( - get_source_dir(type(self.config.source.serviceConnection.__root__)), + service_type.name.lower(), self.typeClassFetch(source_type, True), self.typeClassFetch(source_type, False), ) @@ -67,6 +77,8 @@ class Workflow: self.config.workflowConfig.openMetadataServerConfig ) + self._retrieve_service_connection_if_needed(metadata_config, service_type) + self.source: Source = source_class.create( self.config.source.dict(), metadata_config ) @@ -228,3 +240,35 @@ class Workflow: else: click.secho("Workflow finished successfully", fg="green", bold=True) return 0 + + def _retrieve_service_connection_if_needed( + self, metadata_config: OpenMetadataConnection, service_type: ServiceType + ) -> None: + """ + We override the current `serviceConnection` source config object if source workflow service already exists + in OM. When it is configured, we retrieve the service connection from the secrets' manager. Otherwise, we get it + from the service object itself through the default `SecretsManager`. + + :param metadata_config: OpenMetadata connection config + :param service_type: source workflow service type + :return: + """ + # We override the current serviceConnection source object if source workflow service already exists in OM. + # We retrieve the service connection from the secrets' manager when it is configured. Otherwise, we get it + # from the service object itself. + if not self._is_sample_source(self.config.source.type): + metadata = OpenMetadata(config=metadata_config) + service = metadata.get_by_name( + get_service_class_from_service_type(service_type), + self.config.source.serviceName, + ) + if service: + self.config.source.serviceConnection = ( + metadata.secrets_manager_client.retrieve_service_connection( + service, service_type.name.lower() + ) + ) + + @staticmethod + def _is_sample_source(service_type): + return service_type == "sample-data" or service_type == "sample-usage" diff --git a/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py b/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py index 941a5f0a8fe..28aa96855ae 100644 --- a/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py +++ b/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py @@ -8,6 +8,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import json import os import shutil diff --git a/ingestion/src/metadata/ingestion/bulksink/migrate.py b/ingestion/src/metadata/ingestion/bulksink/migrate.py index 5a11044025c..946bae8c3f9 100644 --- a/ingestion/src/metadata/ingestion/bulksink/migrate.py +++ b/ingestion/src/metadata/ingestion/bulksink/migrate.py @@ -8,6 +8,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import json import logging import shutil diff --git a/ingestion/src/metadata/ingestion/models/ometa_policy.py b/ingestion/src/metadata/ingestion/models/ometa_policy.py index c752e51c258..cd0cee9b262 100644 --- a/ingestion/src/metadata/ingestion/models/ometa_policy.py +++ b/ingestion/src/metadata/ingestion/models/ometa_policy.py @@ -8,6 +8,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + from typing import Optional from pydantic import BaseModel diff --git a/ingestion/src/metadata/ingestion/models/ometa_table_db.py b/ingestion/src/metadata/ingestion/models/ometa_table_db.py index 8c91e94ab89..6613b8f4425 100644 --- a/ingestion/src/metadata/ingestion/models/ometa_table_db.py +++ b/ingestion/src/metadata/ingestion/models/ometa_table_db.py @@ -8,6 +8,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + from typing import Optional from pydantic import BaseModel diff --git a/ingestion/src/metadata/ingestion/models/table_metadata.py b/ingestion/src/metadata/ingestion/models/table_metadata.py index 72225a60c11..0a04fdd0d09 100644 --- a/ingestion/src/metadata/ingestion/models/table_metadata.py +++ b/ingestion/src/metadata/ingestion/models/table_metadata.py @@ -9,7 +9,6 @@ # See the License for the specific language governing permissions and # limitations under the License. - from typing import List, Optional from pydantic import BaseModel diff --git a/ingestion/src/metadata/ingestion/models/topology.py b/ingestion/src/metadata/ingestion/models/topology.py index 95eac5b2d0f..cb69b02422b 100644 --- a/ingestion/src/metadata/ingestion/models/topology.py +++ b/ingestion/src/metadata/ingestion/models/topology.py @@ -39,6 +39,7 @@ class NodeStage(BaseModel, Generic[T]): False # If we need to cache all values being yielded in the context ) clear_cache: bool = False # If we need to clean cache values in the context for each produced element + overwrite: bool = True # If we want to overwrite existing data from OM consumer: Optional[ List[str] ] = None # keys in the source context to fetch state from the parent's context diff --git a/ingestion/src/metadata/ingestion/models/user.py b/ingestion/src/metadata/ingestion/models/user.py index 9192b22ff04..6041e4fe4fa 100644 --- a/ingestion/src/metadata/ingestion/models/user.py +++ b/ingestion/src/metadata/ingestion/models/user.py @@ -8,6 +8,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + from typing import List, Optional from pydantic.main import BaseModel diff --git a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py index 4d5b1f33b74..a729c20a7c7 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py @@ -79,7 +79,8 @@ class DashboardServiceTopology(ServiceTopology): NodeStage( type_=DashboardService, context="dashboard_service", - processor="yield_dashboard_service", + processor="yield_create_request_dashboard_service", + overwrite=False, ), NodeStage( type_=OMetaTagAndCategory, @@ -267,7 +268,7 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC): def get_services(self) -> Iterable[WorkflowSource]: yield self.config - def yield_dashboard_service(self, config: WorkflowSource): + def yield_create_request_dashboard_service(self, config: WorkflowSource): yield self.metadata.get_create_service_from_source( entity=DashboardService, config=config ) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/looker.py b/ingestion/src/metadata/ingestion/source/dashboard/looker.py index df7d7ac3598..30f6aa671d2 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/looker.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/looker.py @@ -8,6 +8,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import traceback from datetime import datetime from typing import Iterable, List, Optional, Set, cast diff --git a/ingestion/src/metadata/ingestion/source/dashboard/metabase.py b/ingestion/src/metadata/ingestion/source/dashboard/metabase.py index 9d6021d36a0..d61d118dea9 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/metabase.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/metabase.py @@ -56,20 +56,6 @@ logger = ingestion_logger() class MetabaseSource(DashboardServiceSource): - """Metabase entity class - - Args: - config: - metadata_config: - Attributes: - config: - metadata_config: - status: - metabase_session: - dashboard_service: - charts: - metric_charts: - """ config: WorkflowSource metadata_config: OpenMetadataConnection @@ -81,19 +67,10 @@ class MetabaseSource(DashboardServiceSource): metadata_config: OpenMetadataConnection, ): super().__init__(config, metadata_config) - self.connection = get_connection(self.service_connection) self.metabase_session = self.connection.client["metabase_session"] @classmethod def create(cls, config_dict, metadata_config: OpenMetadataConnection): - """Instantiate object - - Args: - config_dict: - metadata_config: - Returns: - MetabaseSource - """ config = WorkflowSource.parse_obj(config_dict) connection: MetabaseConnection = config.serviceConnection.__root__.config if not isinstance(connection, MetabaseConnection): diff --git a/ingestion/src/metadata/ingestion/source/dashboard/mode.py b/ingestion/src/metadata/ingestion/source/dashboard/mode.py index 5920e7293f5..6a93dbfa266 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/mode.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/mode.py @@ -52,16 +52,6 @@ logger = ingestion_logger() class ModeSource(DashboardServiceSource): - """Mode entity class - Args: - config: - metadata_config: - Attributes: - config: - metadata_config: - charts: - """ - def __init__( self, config: WorkflowSource, @@ -73,13 +63,6 @@ class ModeSource(DashboardServiceSource): @classmethod def create(cls, config_dict, metadata_config: OpenMetadataConnection): - """Instantiate object - Args: - config_dict: - metadata_config: - Returns: - ModeSource - """ config = WorkflowSource.parse_obj(config_dict) connection: ModeConnection = config.serviceConnection.__root__.config if not isinstance(connection, ModeConnection): diff --git a/ingestion/src/metadata/ingestion/source/dashboard/powerbi.py b/ingestion/src/metadata/ingestion/source/dashboard/powerbi.py index 045f34c2bb4..e91261b01d7 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/powerbi.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/powerbi.py @@ -40,16 +40,6 @@ logger = ingestion_logger() class PowerbiSource(DashboardServiceSource): - """PowerBi entity class - Args: - config: - metadata_config: - Attributes: - config: - metadata_config: - charts: - """ - def __init__( self, config: WorkflowSource, @@ -59,13 +49,6 @@ class PowerbiSource(DashboardServiceSource): @classmethod def create(cls, config_dict, metadata_config: OpenMetadataConnection): - """Instantiate object - Args: - config_dict: - metadata_config: - Returns: - PowerBiSource - """ config = WorkflowSource.parse_obj(config_dict) connection: PowerBIConnection = config.serviceConnection.__root__.config if not isinstance(connection, PowerBIConnection): diff --git a/ingestion/src/metadata/ingestion/source/dashboard/redash.py b/ingestion/src/metadata/ingestion/source/dashboard/redash.py index 4efdb0e2155..a3976381ae7 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/redash.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/redash.py @@ -8,6 +8,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + from logging.config import DictConfigurator from typing import Iterable, List, Optional diff --git a/ingestion/src/metadata/ingestion/source/dashboard/superset.py b/ingestion/src/metadata/ingestion/source/dashboard/superset.py index d969ab0e92b..491de220d1b 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/superset.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/superset.py @@ -16,8 +16,6 @@ import json import traceback from typing import Iterable, List, Optional -import dateutil.parser as dateparser - from metadata.generated.schema.api.data.createChart import CreateChartRequest from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest @@ -87,22 +85,6 @@ def get_filter_name(filter_obj): class SupersetSource(DashboardServiceSource): - """ - Superset source class - - Args: - config: - metadata_config: - - Attributes: - config: - metadata_config: - status: - platform: - service_type: - service: - - """ config: WorkflowSource metadata_config: OpenMetadataConnection diff --git a/ingestion/src/metadata/ingestion/source/dashboard/tableau.py b/ingestion/src/metadata/ingestion/source/dashboard/tableau.py index 04a37682121..b96cbacac80 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/tableau.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/tableau.py @@ -59,17 +59,6 @@ TABLEAU_TAG_CATEGORY = "TableauTags" class TableauSource(DashboardServiceSource): - """Tableau source entity class - - Args: - config: - metadata_config: - - Attributes: - config: - metadata_config: - all_dashboard_details: - """ config: WorkflowSource metadata_config: OpenMetadataConnection diff --git a/ingestion/src/metadata/ingestion/source/database/athena.py b/ingestion/src/metadata/ingestion/source/database/athena.py index 793df147d1c..a6c479ba862 100644 --- a/ingestion/src/metadata/ingestion/source/database/athena.py +++ b/ingestion/src/metadata/ingestion/source/database/athena.py @@ -8,6 +8,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + from typing import Iterable, Optional, Tuple from pyathena.sqlalchemy_athena import AthenaDialect @@ -104,7 +105,6 @@ class AthenaSource(CommonDbSourceService): raise InvalidSourceException( f"Expected AthenaConnection, but got {connection}" ) - return cls(config, metadata_config) def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, str]]]: diff --git a/ingestion/src/metadata/ingestion/source/database/azuresql.py b/ingestion/src/metadata/ingestion/source/database/azuresql.py index 53dbdbeca43..d6918f69511 100644 --- a/ingestion/src/metadata/ingestion/source/database/azuresql.py +++ b/ingestion/src/metadata/ingestion/source/database/azuresql.py @@ -24,13 +24,6 @@ from metadata.ingestion.source.database.common_db_source import CommonDbSourceSe class AzuresqlSource(CommonDbSourceService): - """Azure SQL Source class - - Args: - config: - metadata_config: - """ - @classmethod def create(cls, config_dict, metadata_config: OpenMetadataConnection): config: WorkflowSource = WorkflowSource.parse_obj(config_dict) diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery.py b/ingestion/src/metadata/ingestion/source/database/bigquery.py index 475b98e1421..27aa398824e 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery.py @@ -101,7 +101,6 @@ class BigquerySource(CommonDbSourceService): raise InvalidSourceException( f"Expected BigQueryConnection, but got {connection}" ) - return cls(config, metadata_config) def standardize_table_name(self, schema: str, table: str) -> str: diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery_usage.py b/ingestion/src/metadata/ingestion/source/database/bigquery_usage.py index 02802763a77..51baa2359c6 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery_usage.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery_usage.py @@ -29,7 +29,6 @@ from metadata.generated.schema.metadataIngestion.workflow import ( ) from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.source.database.usage_source import UsageSource -from metadata.utils.credentials import set_google_credentials from metadata.utils.logger import ingestion_logger from metadata.utils.sql_queries import BIGQUERY_USAGE_STATEMENT @@ -56,11 +55,6 @@ class BigqueryUsageSource(UsageSource): raise InvalidSourceException( f"Expected BigQueryConnection, but got {connection}" ) - - set_google_credentials( - gcs_credentials=config.serviceConnection.__root__.config.credentials - ) - return cls(config, metadata_config) def get_sql_statement(self, start_time: datetime, end_time: datetime) -> str: diff --git a/ingestion/src/metadata/ingestion/source/database/clickhouse.py b/ingestion/src/metadata/ingestion/source/database/clickhouse.py index 9585f1aea25..c3850f7809e 100644 --- a/ingestion/src/metadata/ingestion/source/database/clickhouse.py +++ b/ingestion/src/metadata/ingestion/source/database/clickhouse.py @@ -172,5 +172,4 @@ class ClickhouseSource(CommonDbSourceService): raise InvalidSourceException( f"Expected ClickhouseConnection, but got {connection}" ) - return cls(config, metadata_config) diff --git a/ingestion/src/metadata/ingestion/source/database/clickhouse_usage.py b/ingestion/src/metadata/ingestion/source/database/clickhouse_usage.py index dadf321e949..74553891f7a 100644 --- a/ingestion/src/metadata/ingestion/source/database/clickhouse_usage.py +++ b/ingestion/src/metadata/ingestion/source/database/clickhouse_usage.py @@ -17,31 +17,32 @@ import ast from metadata.generated.schema.entity.services.connections.database.clickhouseConnection import ( ClickhouseConnection, ) +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) -from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.source.database.usage_source import UsageSource from metadata.utils.sql_queries import CLICKHOUSE_SQL_USAGE_STATEMENT class ClickhouseUsageSource(UsageSource): - def __init__(self, config: WorkflowSource, metadata_config: WorkflowConfig): + def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection): super().__init__(config, metadata_config) self.sql_stmt = CLICKHOUSE_SQL_USAGE_STATEMENT.format( start_time=self.start, end_time=self.end ) @classmethod - def create(cls, config_dict, metadata_config: WorkflowConfig): + def create(cls, config_dict, metadata_config: OpenMetadataConnection): config: WorkflowSource = WorkflowSource.parse_obj(config_dict) connection: ClickhouseConnection = config.serviceConnection.__root__.config if not isinstance(connection, ClickhouseConnection): raise InvalidSourceException( f"Expected ClickhouseConnection, but got {connection}" ) - return cls(config, metadata_config) def get_schema_name(self, data: dict) -> str: diff --git a/ingestion/src/metadata/ingestion/source/database/common_db_source.py b/ingestion/src/metadata/ingestion/source/database/common_db_source.py index 23fc5a2b2a4..030f17ef1fb 100644 --- a/ingestion/src/metadata/ingestion/source/database/common_db_source.py +++ b/ingestion/src/metadata/ingestion/source/database/common_db_source.py @@ -48,7 +48,6 @@ from metadata.ingestion.source.database.database_service import ( ) from metadata.ingestion.source.database.sql_column_handler import SqlColumnHandlerMixin from metadata.ingestion.source.database.sqlalchemy_source import SqlAlchemySource -from metadata.utils import fqn from metadata.utils.connections import get_connection, test_connection from metadata.utils.filters import filter_by_schema, filter_by_table from metadata.utils.logger import ingestion_logger @@ -76,13 +75,14 @@ class CommonDbSourceService( self.source_config: DatabaseServiceMetadataPipeline = ( self.config.sourceConfig.config ) + + self.metadata_config = metadata_config + self.metadata = OpenMetadata(self.metadata_config) + # It will be one of the Unions. We don't know the specific type here. self.service_connection = self.config.serviceConnection.__root__.config self.status = SQLSourceStatus() - self.metadata_config = metadata_config - self.metadata = OpenMetadata(metadata_config) - self.engine: Engine = get_connection(self.service_connection) self.test_connection() diff --git a/ingestion/src/metadata/ingestion/source/database/database_service.py b/ingestion/src/metadata/ingestion/source/database/database_service.py index ae97f6beab8..d1468489072 100644 --- a/ingestion/src/metadata/ingestion/source/database/database_service.py +++ b/ingestion/src/metadata/ingestion/source/database/database_service.py @@ -109,7 +109,8 @@ class DatabaseServiceTopology(ServiceTopology): NodeStage( type_=DatabaseService, context="database_service", - processor="yield_database_service", + processor="yield_create_request_database_service", + overwrite=False, ), NodeStage( type_=StorageService, @@ -247,7 +248,7 @@ class DatabaseServiceSource(DBTMixin, TopologyRunnerMixin, Source, ABC): def get_services(self) -> Iterable[WorkflowSource]: yield self.config - def yield_database_service(self, config: WorkflowSource): + def yield_create_request_database_service(self, config: WorkflowSource): yield self.metadata.get_create_service_from_source( entity=DatabaseService, config=config ) diff --git a/ingestion/src/metadata/ingestion/source/database/databricks.py b/ingestion/src/metadata/ingestion/source/database/databricks.py index a9551e019da..2c0043b04ad 100644 --- a/ingestion/src/metadata/ingestion/source/database/databricks.py +++ b/ingestion/src/metadata/ingestion/source/database/databricks.py @@ -130,5 +130,4 @@ class DatabricksSource(CommonDbSourceService): raise InvalidSourceException( f"Expected DatabricksConnection, but got {connection}" ) - return cls(config, metadata_config) diff --git a/ingestion/src/metadata/ingestion/source/database/datalake.py b/ingestion/src/metadata/ingestion/source/database/datalake.py index 05131d47e29..819d75f6990 100644 --- a/ingestion/src/metadata/ingestion/source/database/datalake.py +++ b/ingestion/src/metadata/ingestion/source/database/datalake.py @@ -35,7 +35,6 @@ from metadata.generated.schema.entity.services.connections.database.datalakeConn from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) -from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import ( DatabaseServiceMetadataPipeline, ) @@ -76,7 +75,6 @@ DATALAKE_SUPPORTED_FILE_TYPES = (".csv", ".tsv", ".json", ".parquet") class DatalakeSource(DatabaseServiceSource): def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection): self.status = SQLSourceStatus() - self.config = config self.source_config: DatabaseServiceMetadataPipeline = ( self.config.sourceConfig.config diff --git a/ingestion/src/metadata/ingestion/source/database/db2.py b/ingestion/src/metadata/ingestion/source/database/db2.py index c472245547d..6b31920e7b9 100644 --- a/ingestion/src/metadata/ingestion/source/database/db2.py +++ b/ingestion/src/metadata/ingestion/source/database/db2.py @@ -11,7 +11,6 @@ from ibm_db_sa.base import DB2Dialect from sqlalchemy.engine import reflection -from sqlalchemy.engine.reflection import Inspector from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, diff --git a/ingestion/src/metadata/ingestion/source/database/deltalake.py b/ingestion/src/metadata/ingestion/source/database/deltalake.py index f2583039f5b..f798b17465e 100644 --- a/ingestion/src/metadata/ingestion/source/database/deltalake.py +++ b/ingestion/src/metadata/ingestion/source/database/deltalake.py @@ -19,7 +19,6 @@ from metadata.generated.schema.entity.services.connections.database.deltaLakeCon from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) -from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import ( DatabaseServiceMetadataPipeline, ) diff --git a/ingestion/src/metadata/ingestion/source/database/druid.py b/ingestion/src/metadata/ingestion/source/database/druid.py index cfec6ebee79..e183bf10061 100644 --- a/ingestion/src/metadata/ingestion/source/database/druid.py +++ b/ingestion/src/metadata/ingestion/source/database/druid.py @@ -9,7 +9,6 @@ # See the License for the specific language governing permissions and # limitations under the License. - from metadata.generated.schema.entity.services.connections.database.druidConnection import ( DruidConnection, ) diff --git a/ingestion/src/metadata/ingestion/source/database/dynamodb.py b/ingestion/src/metadata/ingestion/source/database/dynamodb.py index 648cbf6409f..0323a9ed37b 100644 --- a/ingestion/src/metadata/ingestion/source/database/dynamodb.py +++ b/ingestion/src/metadata/ingestion/source/database/dynamodb.py @@ -14,7 +14,6 @@ from metadata.generated.schema.entity.services.connections.database.dynamoDBConn from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) -from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import ( DatabaseServiceMetadataPipeline, ) diff --git a/ingestion/src/metadata/ingestion/source/database/glue.py b/ingestion/src/metadata/ingestion/source/database/glue.py index 0b999a0a958..bd6a7e0a30e 100755 --- a/ingestion/src/metadata/ingestion/source/database/glue.py +++ b/ingestion/src/metadata/ingestion/source/database/glue.py @@ -27,7 +27,6 @@ from metadata.generated.schema.entity.services.connections.database.glueConnecti from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) -from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import ( DatabaseServiceMetadataPipeline, ) @@ -35,7 +34,6 @@ from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) from metadata.generated.schema.type.entityReference import EntityReference -from metadata.ingestion.api.common import Entity from metadata.ingestion.api.source import InvalidSourceException, SourceStatus from metadata.ingestion.models.ometa_tag_category import OMetaTagAndCategory from metadata.ingestion.ometa.ometa_api import OpenMetadata diff --git a/ingestion/src/metadata/ingestion/source/database/hive.py b/ingestion/src/metadata/ingestion/source/database/hive.py index 53e240eb025..0a106c4ad6e 100644 --- a/ingestion/src/metadata/ingestion/source/database/hive.py +++ b/ingestion/src/metadata/ingestion/source/database/hive.py @@ -124,9 +124,6 @@ HiveDialect.get_view_names = get_view_names class HiveSource(CommonDbSourceService): - def prepare(self): - return super().prepare() - @classmethod def create(cls, config_dict, metadata_config: OpenMetadataConnection): config = WorkflowSource.parse_obj(config_dict) diff --git a/ingestion/src/metadata/ingestion/source/database/mariadb.py b/ingestion/src/metadata/ingestion/source/database/mariadb.py index b8da0516bca..c42e4044216 100644 --- a/ingestion/src/metadata/ingestion/source/database/mariadb.py +++ b/ingestion/src/metadata/ingestion/source/database/mariadb.py @@ -9,9 +9,6 @@ # See the License for the specific language governing permissions and # limitations under the License. - -from sqlalchemy.engine.reflection import Inspector - from metadata.generated.schema.entity.services.connections.database.mariaDBConnection import ( MariaDBConnection, ) @@ -37,5 +34,4 @@ class MariadbSource(CommonDbSourceService): raise InvalidSourceException( f"Expected MariaDBConnection, but got {connection}" ) - return cls(config, metadata_config) diff --git a/ingestion/src/metadata/ingestion/source/database/mssql.py b/ingestion/src/metadata/ingestion/source/database/mssql.py index 1c40ddb2526..34ec8a82bc4 100644 --- a/ingestion/src/metadata/ingestion/source/database/mssql.py +++ b/ingestion/src/metadata/ingestion/source/database/mssql.py @@ -11,9 +11,6 @@ """MSSQL source module""" from typing import Iterable -from sqlalchemy.engine.reflection import Inspector -from sqlalchemy.inspection import inspect - from metadata.generated.schema.entity.services.connections.database.mssqlConnection import ( MssqlConnection, ) @@ -32,14 +29,6 @@ logger = ingestion_logger() class MssqlSource(CommonDbSourceService): - """MSSQL Source class - - Args: - config: - metadata_config: - ctx - """ - @classmethod def create(cls, config_dict, metadata_config: OpenMetadataConnection): """Create class instance""" diff --git a/ingestion/src/metadata/ingestion/source/database/mysql.py b/ingestion/src/metadata/ingestion/source/database/mysql.py index dee1bdfbb03..d90e1f86054 100644 --- a/ingestion/src/metadata/ingestion/source/database/mysql.py +++ b/ingestion/src/metadata/ingestion/source/database/mysql.py @@ -34,5 +34,4 @@ class MysqlSource(CommonDbSourceService): raise InvalidSourceException( f"Expected MysqlConnection, but got {connection}" ) - return cls(config, metadata_config) diff --git a/ingestion/src/metadata/ingestion/source/database/pinotdb.py b/ingestion/src/metadata/ingestion/source/database/pinotdb.py index 53ec9aff22c..3c239c1684d 100644 --- a/ingestion/src/metadata/ingestion/source/database/pinotdb.py +++ b/ingestion/src/metadata/ingestion/source/database/pinotdb.py @@ -9,7 +9,6 @@ # See the License for the specific language governing permissions and # limitations under the License. - from metadata.generated.schema.entity.services.connections.database.pinotDBConnection import ( PinotDBConnection, ) @@ -35,5 +34,4 @@ class PinotdbSource(CommonDbSourceService): raise InvalidSourceException( f"Expected PinotdbConnection, but got {connection}" ) - return cls(config, metadata_config) diff --git a/ingestion/src/metadata/ingestion/source/database/postgres.py b/ingestion/src/metadata/ingestion/source/database/postgres.py index 867ce6b3196..6bad80cf1d5 100644 --- a/ingestion/src/metadata/ingestion/source/database/postgres.py +++ b/ingestion/src/metadata/ingestion/source/database/postgres.py @@ -44,7 +44,6 @@ class PostgresSource(CommonDbSourceService): raise InvalidSourceException( f"Expected PostgresConnection, but got {connection}" ) - return cls(config, metadata_config) def get_database_names(self) -> Iterable[str]: diff --git a/ingestion/src/metadata/ingestion/source/database/presto.py b/ingestion/src/metadata/ingestion/source/database/presto.py index 457e328705c..ad999148c42 100644 --- a/ingestion/src/metadata/ingestion/source/database/presto.py +++ b/ingestion/src/metadata/ingestion/source/database/presto.py @@ -86,9 +86,6 @@ class PrestoSource(CommonDbSourceService): """ def __init__(self, config, metadata_config): - self.presto_connection: PrestoConnection = ( - config.serviceConnection.__root__.config - ) super().__init__(config, metadata_config) @classmethod @@ -103,4 +100,4 @@ class PrestoSource(CommonDbSourceService): def get_database_names(self) -> Iterable[str]: self.inspector = inspect(self.engine) - yield self.presto_connection.catalog + yield self.service_connection.catalog diff --git a/ingestion/src/metadata/ingestion/source/database/query_log_usage.py b/ingestion/src/metadata/ingestion/source/database/query_log_usage.py index cddc33b29a0..53f4dbd2dec 100644 --- a/ingestion/src/metadata/ingestion/source/database/query_log_usage.py +++ b/ingestion/src/metadata/ingestion/source/database/query_log_usage.py @@ -30,6 +30,5 @@ class QueryLogUsageSource(UsageSource): @classmethod def create(cls, config_dict, metadata_config: WorkflowConfig): - """Create class instance""" config: WorkflowSource = WorkflowSource.parse_obj(config_dict) return cls(config, metadata_config) diff --git a/ingestion/src/metadata/ingestion/source/database/redshift.py b/ingestion/src/metadata/ingestion/source/database/redshift.py index b903f3d0fbf..1d0d28507c6 100644 --- a/ingestion/src/metadata/ingestion/source/database/redshift.py +++ b/ingestion/src/metadata/ingestion/source/database/redshift.py @@ -441,27 +441,11 @@ STANDARD_TABLE_TYPES = { # pylint: disable=useless-super-delegation class RedshiftSource(CommonDbSourceService): - """ - Redshift source cloass - - Args: - confi: - metadata_config: - """ - def __init__(self, config, metadata_config): super().__init__(config, metadata_config) @classmethod def create(cls, config_dict, metadata_config: OpenMetadataConnection): - """ - Create source - - Args: - config_dict: - metadata_config: - Returns: - """ config: WorkflowSource = WorkflowSource.parse_obj(config_dict) connection: RedshiftConnection = config.serviceConnection.__root__.config if not isinstance(connection, RedshiftConnection): diff --git a/ingestion/src/metadata/ingestion/source/database/redshift_usage.py b/ingestion/src/metadata/ingestion/source/database/redshift_usage.py index c329f4dac58..c8c44841e6f 100644 --- a/ingestion/src/metadata/ingestion/source/database/redshift_usage.py +++ b/ingestion/src/metadata/ingestion/source/database/redshift_usage.py @@ -16,14 +16,14 @@ from typing import Iterator, Union from metadata.generated.schema.entity.services.connections.database.redshiftConnection import ( RedshiftConnection, ) +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) -from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.source.database.usage_source import UsageSource - -# pylint: disable=useless-super-delegation from metadata.utils.logger import ingestion_logger from metadata.utils.sql_queries import REDSHIFT_SQL_STATEMENT @@ -34,7 +34,7 @@ class RedshiftUsageSource(UsageSource): SQL_STATEMENT = REDSHIFT_SQL_STATEMENT - def __init__(self, config: WorkflowSource, metadata_config: WorkflowConfig): + def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection): super().__init__(config, metadata_config) self.sql_stmt = RedshiftUsageSource.SQL_STATEMENT.format( start_time=self.start, end_time=self.end @@ -43,7 +43,7 @@ class RedshiftUsageSource(UsageSource): self._database = "redshift" @classmethod - def create(cls, config_dict, metadata_config: WorkflowConfig): + def create(cls, config_dict, metadata_config: OpenMetadataConnection): config: WorkflowSource = WorkflowSource.parse_obj(config_dict) connection: RedshiftConnection = config.serviceConnection.__root__.config if not isinstance(connection, RedshiftConnection): diff --git a/ingestion/src/metadata/ingestion/source/database/salesforce.py b/ingestion/src/metadata/ingestion/source/database/salesforce.py index 0ceaabe1a14..727b66437ab 100644 --- a/ingestion/src/metadata/ingestion/source/database/salesforce.py +++ b/ingestion/src/metadata/ingestion/source/database/salesforce.py @@ -25,7 +25,6 @@ from metadata.generated.schema.entity.services.connections.database.salesforceCo from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) -from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import ( DatabaseServiceMetadataPipeline, ) @@ -71,7 +70,6 @@ class SalesforceSource(DatabaseServiceSource): raise InvalidSourceException( f"Expected SalesforceConnection, but got {connection}" ) - return cls(config, metadata_config) def get_database_names(self) -> Iterable[str]: diff --git a/ingestion/src/metadata/ingestion/source/database/sample_usage.py b/ingestion/src/metadata/ingestion/source/database/sample_usage.py index ed8f2e4c79a..60c56cf9dd1 100644 --- a/ingestion/src/metadata/ingestion/source/database/sample_usage.py +++ b/ingestion/src/metadata/ingestion/source/database/sample_usage.py @@ -70,7 +70,7 @@ class SampleUsageSource(UsageSource): connection: SampleDataConnection = config.serviceConnection.__root__.config if not isinstance(connection, SampleDataConnection): raise InvalidSourceException( - f"Expected MssqlConnection, but got {connection}" + f"Expected SampleDataConnection, but got {connection}" ) return cls(config, metadata_config) diff --git a/ingestion/src/metadata/ingestion/source/database/singlestore.py b/ingestion/src/metadata/ingestion/source/database/singlestore.py index 067eb5c9ee0..13bc8bf8100 100644 --- a/ingestion/src/metadata/ingestion/source/database/singlestore.py +++ b/ingestion/src/metadata/ingestion/source/database/singlestore.py @@ -9,7 +9,6 @@ # See the License for the specific language governing permissions and # limitations under the License. - from metadata.generated.schema.entity.services.connections.database.singleStoreConnection import ( SingleStoreConnection, ) @@ -35,5 +34,4 @@ class SinglestoreSource(CommonDbSourceService): raise InvalidSourceException( f"Expected SingleStoreConnection, but got {connection}" ) - return cls(config, metadata_config) diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake.py b/ingestion/src/metadata/ingestion/source/database/snowflake.py index 3518a07dab7..f5bd53a8244 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake.py @@ -8,6 +8,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + from typing import Iterable from snowflake.sqlalchemy.custom_types import VARIANT diff --git a/ingestion/src/metadata/ingestion/source/database/sqlite.py b/ingestion/src/metadata/ingestion/source/database/sqlite.py index a16b48d5290..3ece35e4c52 100644 --- a/ingestion/src/metadata/ingestion/source/database/sqlite.py +++ b/ingestion/src/metadata/ingestion/source/database/sqlite.py @@ -39,5 +39,4 @@ class SqliteSource(CommonDbSourceService): raise InvalidSourceException( f"Expected SQLiteConnection, but got {connection}" ) - return cls(config, metadata_config) diff --git a/ingestion/src/metadata/ingestion/source/database/usage_source.py b/ingestion/src/metadata/ingestion/source/database/usage_source.py index b5bdb566858..fbc1530d01f 100644 --- a/ingestion/src/metadata/ingestion/source/database/usage_source.py +++ b/ingestion/src/metadata/ingestion/source/database/usage_source.py @@ -25,6 +25,7 @@ from metadata.generated.schema.metadataIngestion.workflow import ( ) from metadata.generated.schema.type.tableQuery import TableQueries, TableQuery from metadata.ingestion.api.source import Source, SourceStatus +from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.database.common_db_source import SQLSourceStatus from metadata.utils.connections import get_connection, test_connection from metadata.utils.filters import filter_by_database, filter_by_schema @@ -42,7 +43,8 @@ class UsageSource(Source[TableQuery], ABC): super().__init__() self.config = config self.metadata_config = metadata_config - self.connection = config.serviceConnection.__root__.config + self.metadata = OpenMetadata(metadata_config) + self.connection = self.config.serviceConnection.__root__.config self.source_config = self.config.sourceConfig.config self.start, self.end = get_start_and_end(self.source_config.queryLogDuration) self.analysis_date = self.end diff --git a/ingestion/src/metadata/ingestion/source/database/vertica.py b/ingestion/src/metadata/ingestion/source/database/vertica.py index 91c2102261c..afede37f36e 100644 --- a/ingestion/src/metadata/ingestion/source/database/vertica.py +++ b/ingestion/src/metadata/ingestion/source/database/vertica.py @@ -230,5 +230,4 @@ class VerticaSource(CommonDbSourceService): raise InvalidSourceException( f"Expected VerticaConnection, but got {connection}" ) - return cls(config, metadata_config) diff --git a/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py b/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py index a32eaa4d981..90d4e5c7c8e 100644 --- a/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py +++ b/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py @@ -16,8 +16,6 @@ from abc import ABC, abstractmethod from dataclasses import dataclass, field from typing import Any, Iterable, List, Optional -from confluent_kafka.admin import AdminClient, ConfigResource - from metadata.generated.schema.api.data.createTopic import CreateTopicRequest from metadata.generated.schema.entity.data.topic import Topic from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( @@ -61,7 +59,8 @@ class MessagingServiceTopology(ServiceTopology): NodeStage( type_=MessagingService, context="messaging_service", - processor="yield_messaging_service", + processor="yield_create_request_messaging_service", + overwrite=False, ) ], children=["topic"], @@ -139,10 +138,10 @@ class MessagingServiceSource(TopologyRunnerMixin, Source, ABC): self.config = config self.metadata_config = metadata_config self.metadata = OpenMetadata(metadata_config) - self.service_connection = self.config.serviceConnection.__root__.config self.source_config: MessagingServiceMetadataPipeline = ( self.config.sourceConfig.config ) + self.service_connection = self.config.serviceConnection.__root__.config self.connection = get_connection(self.service_connection) self.test_connection() self.status = MessagingSourceStatus() @@ -160,7 +159,7 @@ class MessagingServiceSource(TopologyRunnerMixin, Source, ABC): continue yield topic_details - def yield_messaging_service(self, config: WorkflowSource): + def yield_create_request_messaging_service(self, config: WorkflowSource): yield self.metadata.get_create_service_from_source( entity=MessagingService, config=config ) diff --git a/ingestion/src/metadata/ingestion/source/metadata/amundsen.py b/ingestion/src/metadata/ingestion/source/metadata/amundsen.py index 5a7da0de039..3618d08c1cd 100644 --- a/ingestion/src/metadata/ingestion/source/metadata/amundsen.py +++ b/ingestion/src/metadata/ingestion/source/metadata/amundsen.py @@ -39,6 +39,7 @@ from metadata.generated.schema.entity.services.databaseService import ( DatabaseService, DatabaseServiceType, ) +from metadata.generated.schema.entity.services.metadataService import MetadataService from metadata.generated.schema.entity.tags.tagCategory import Tag from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, @@ -114,9 +115,8 @@ class AmundsenSource(Source[Entity]): def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection): self.config = config self.metadata_config = metadata_config - self.service_connection = config.serviceConnection.__root__.config self.metadata = OpenMetadata(self.metadata_config) - + self.service_connection = self.config.serviceConnection.__root__.config neo4j_config = Neo4JConfig( username=self.service_connection.username, password=self.service_connection.password.get_secret_value(), diff --git a/ingestion/src/metadata/ingestion/source/metadata/atlas.py b/ingestion/src/metadata/ingestion/source/metadata/atlas.py index 0055b8cd725..a65fda53727 100644 --- a/ingestion/src/metadata/ingestion/source/metadata/atlas.py +++ b/ingestion/src/metadata/ingestion/source/metadata/atlas.py @@ -1,12 +1,10 @@ import traceback import uuid from dataclasses import dataclass, field -from distutils.command.config import config from pathlib import Path from typing import Any, Dict, Iterable, List import yaml -from importlib_metadata import SelectableGroups from metadata.generated.schema.api.data.createTopic import CreateTopicRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest @@ -22,6 +20,7 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata ) from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.entity.services.messagingService import MessagingService +from metadata.generated.schema.entity.services.metadataService import MetadataService from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) @@ -52,7 +51,7 @@ class AtlasSourceStatus(SourceStatus): @dataclass class AtlasSource(Source): - config: AtlasConnection + config: WorkflowSource atlas_client: AtlasClient status: AtlasSourceStatus tables: Dict[str, Any] @@ -60,15 +59,14 @@ class AtlasSource(Source): def __init__( self, - config: AtlasConnection, + config: WorkflowSource, metadata_config: OpenMetadataConnection, ): super().__init__() self.config = config self.metadata_config = metadata_config - self.service_connection = config.serviceConnection.__root__.config - self.metadata = OpenMetadata(metadata_config) + self.service_connection = self.config.serviceConnection.__root__.config self.status = AtlasSourceStatus() self.schema_registry_url = "http://localhost:8081" diff --git a/ingestion/src/metadata/ingestion/source/metadata/metadata.py b/ingestion/src/metadata/ingestion/source/metadata/metadata.py index e4d6df71a9a..97d27278a52 100644 --- a/ingestion/src/metadata/ingestion/source/metadata/metadata.py +++ b/ingestion/src/metadata/ingestion/source/metadata/metadata.py @@ -35,6 +35,7 @@ from metadata.generated.schema.metadataIngestion.workflow import ( ) from metadata.ingestion.api.common import Entity from metadata.ingestion.api.source import Source, SourceStatus +from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils.logger import ingestion_logger logger = ingestion_logger() @@ -42,24 +43,12 @@ logger = ingestion_logger() @dataclass class MetadataSourceStatus(SourceStatus): - """Metadata Source class -- extends SourceStatus class - - Attributes: - success: - failures: - warnings: - """ success: List[str] = field(default_factory=list) failures: List[str] = field(default_factory=list) warnings: List[str] = field(default_factory=list) def scanned_entity(self, entity_class_name: str, entity_name: str) -> None: - """scanned entity method - - Args: - entity_name (str): - """ self.success.append(entity_name) logger.info("%s Scanned: %s", entity_class_name, entity_name) @@ -67,33 +56,11 @@ class MetadataSourceStatus(SourceStatus): def filtered( self, table_name: str, err: str, dataset_name: str = None, col_type: str = None ) -> None: - """filtered methods - - Args: - table_name (str): - err (str): - """ self.warnings.append(table_name) logger.warning("Dropped Entity %s due to %s", table_name, err) class MetadataSource(Source[Entity]): - """Metadata source class - - Args: - config: - metadata_config: - - Attributes: - config: - report: - metadata_config: - status: - wrote_something: - metadata: - tables: - topics: - """ config: WorkflowSource report: SourceStatus @@ -106,10 +73,9 @@ class MetadataSource(Source[Entity]): super().__init__() self.config = config self.metadata_config = metadata_config - self.service_connection = config.serviceConnection.__root__.config + self.metadata = OpenMetadata(metadata_config) self.status = MetadataSourceStatus() self.wrote_something = False - self.metadata = None self.tables = None self.topics = None diff --git a/ingestion/src/metadata/ingestion/source/metadata/metadata_elasticsearch.py b/ingestion/src/metadata/ingestion/source/metadata/metadata_elasticsearch.py index 73b2a1304a4..f9ebecdead0 100644 --- a/ingestion/src/metadata/ingestion/source/metadata/metadata_elasticsearch.py +++ b/ingestion/src/metadata/ingestion/source/metadata/metadata_elasticsearch.py @@ -10,7 +10,6 @@ # limitations under the License. """Metadata source module""" - from metadata.generated.schema.entity.services.connections.metadata.metadataESConnection import ( MetadataESConnection, ) @@ -21,7 +20,6 @@ from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) from metadata.ingestion.api.source import InvalidSourceException, SourceStatus -from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.metadata.metadata import MetadataSource from metadata.utils.logger import ingestion_logger @@ -29,22 +27,6 @@ logger = ingestion_logger() class MetadataElasticsearchSource(MetadataSource): - """MetadataElasticsearch class - - Args: - config: - metadata_config: - - Attributes: - config: - report: - metadata_config: - status: - wrote_something: - metadata: - tables: - topics: - """ config: WorkflowSource report: SourceStatus @@ -55,7 +37,6 @@ class MetadataElasticsearchSource(MetadataSource): metadata_config: OpenMetadataConnection, ): super().__init__(config=config, metadata_config=metadata_config) - self.metadata = OpenMetadata(self.metadata_config) @classmethod def create(cls, config_dict, metadata_config: OpenMetadataConnection): diff --git a/ingestion/src/metadata/ingestion/source/metadata/migrate.py b/ingestion/src/metadata/ingestion/source/metadata/migrate.py index 46e5c2020f3..1e4128167de 100644 --- a/ingestion/src/metadata/ingestion/source/metadata/migrate.py +++ b/ingestion/src/metadata/ingestion/source/metadata/migrate.py @@ -29,7 +29,6 @@ from metadata.generated.schema.metadataIngestion.workflow import ( ) from metadata.ingestion.api.common import Entity from metadata.ingestion.api.source import InvalidSourceException, SourceStatus -from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.metadata import MetadataSource logger = logging.getLogger(__name__) @@ -71,22 +70,6 @@ class DatabaseServiceWrapper: class MigrateSource(MetadataSource): - """OpenmetadataSource class - - Args: - config: - metadata_config: - - Attributes: - config: - report: - metadata_config: - status: - wrote_something: - metadata: - tables: - topics: - """ config: WorkflowSource report: SourceStatus @@ -97,9 +80,6 @@ class MigrateSource(MetadataSource): metadata_config: OpenMetadataConnection, ): super().__init__(config, metadata_config) - self.metadata = OpenMetadata( - OpenMetadataConnection.parse_obj(self.service_connection) - ) @classmethod def create(cls, config_dict, metadata_config: OpenMetadataConnection): diff --git a/ingestion/src/metadata/ingestion/source/metadata/openmetadata.py b/ingestion/src/metadata/ingestion/source/metadata/openmetadata.py index 282b8e1e3e0..5e710a75307 100644 --- a/ingestion/src/metadata/ingestion/source/metadata/openmetadata.py +++ b/ingestion/src/metadata/ingestion/source/metadata/openmetadata.py @@ -18,7 +18,6 @@ from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) from metadata.ingestion.api.source import InvalidSourceException, SourceStatus -from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.metadata.metadata import MetadataSource from metadata.utils.logger import ingestion_logger @@ -26,22 +25,6 @@ logger = ingestion_logger() class OpenmetadataSource(MetadataSource): - """OpenmetadataSource class - - Args: - config: - metadata_config: - - Attributes: - config: - report: - metadata_config: - status: - wrote_something: - metadata: - tables: - topics: - """ config: WorkflowSource report: SourceStatus @@ -52,9 +35,6 @@ class OpenmetadataSource(MetadataSource): metadata_config: OpenMetadataConnection, ): super().__init__(config, metadata_config) - self.metadata = OpenMetadata( - OpenMetadataConnection.parse_obj(self.service_connection) - ) @classmethod def create(cls, config_dict, metadata_config: OpenMetadataConnection): diff --git a/ingestion/src/metadata/ingestion/source/mlmodel/mlflow.py b/ingestion/src/metadata/ingestion/source/mlmodel/mlflow.py index 705703d9026..f715d1a983b 100644 --- a/ingestion/src/metadata/ingestion/source/mlmodel/mlflow.py +++ b/ingestion/src/metadata/ingestion/source/mlmodel/mlflow.py @@ -59,7 +59,6 @@ class MlflowSource(MlModelServiceSource): raise InvalidSourceException( f"Expected MysqlConnection, but got {connection}" ) - return cls(config, metadata_config) def get_mlmodels(self) -> Iterable[Tuple[RegisteredModel, ModelVersion]]: diff --git a/ingestion/src/metadata/ingestion/source/mlmodel/mlmodel_service.py b/ingestion/src/metadata/ingestion/source/mlmodel/mlmodel_service.py index 063e0bdfdc6..cc1696f79f2 100644 --- a/ingestion/src/metadata/ingestion/source/mlmodel/mlmodel_service.py +++ b/ingestion/src/metadata/ingestion/source/mlmodel/mlmodel_service.py @@ -65,7 +65,8 @@ class MlModelServiceTopology(ServiceTopology): NodeStage( type_=MlModelService, context="mlmodel_service", - processor="yield_mlmodel_service", + processor="yield_create_request_mlmodel_service", + overwrite=False, ), ], children=["mlmodel"], @@ -153,7 +154,7 @@ class MlModelServiceSource(TopologyRunnerMixin, Source, ABC): def get_services(self) -> Iterable[WorkflowSource]: yield self.config - def yield_mlmodel_service(self, config: WorkflowSource): + def yield_create_request_mlmodel_service(self, config: WorkflowSource): yield self.metadata.get_create_service_from_source( entity=MlModelService, config=config ) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airflow.py b/ingestion/src/metadata/ingestion/source/pipeline/airflow.py index 205a2dd50f6..1de104cdd50 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airflow.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airflow.py @@ -17,7 +17,6 @@ from typing import Any, Iterable, List, Optional, cast from airflow.models import BaseOperator, DagRun from airflow.models.serialized_dag import SerializedDagModel from airflow.serialization.serialized_objects import SerializedDAG -from sqlalchemy.engine import Engine from sqlalchemy.orm import Session from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest @@ -43,11 +42,7 @@ from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource -from metadata.utils.connections import ( - create_and_bind_session, - get_connection, - test_connection, -) +from metadata.utils.connections import create_and_bind_session, test_connection from metadata.utils.helpers import datetime_to_ts from metadata.utils.logger import ingestion_logger @@ -73,11 +68,8 @@ class AirflowSource(PipelineServiceSource): config: WorkflowSource, metadata_config: OpenMetadataConnection, ): - self._session = None - self.service_connection = config.serviceConnection.__root__.config - self.engine: Engine = get_connection(self.service_connection.connection) super().__init__(config, metadata_config) - # Create the connection to the database + self._session = None @classmethod def create(cls, config_dict, metadata_config: OpenMetadataConnection): diff --git a/ingestion/src/metadata/ingestion/source/pipeline/fivetran.py b/ingestion/src/metadata/ingestion/source/pipeline/fivetran.py index 71d79c27977..cb25037f758 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/fivetran.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/fivetran.py @@ -12,7 +12,6 @@ Airbyte source to extract metadata """ -from tokenize import group from typing import Iterable, Optional from pydantic import BaseModel diff --git a/ingestion/src/metadata/ingestion/source/pipeline/glue.py b/ingestion/src/metadata/ingestion/source/pipeline/glue.py index ece567e7aa8..1cf17e8278c 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/glue.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/glue.py @@ -8,8 +8,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import traceback -from typing import Any, Iterable, Optional +from typing import Any, Iterable, List, Optional from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest @@ -97,7 +98,7 @@ class GlueSource(PipelineServiceSource): ) yield pipeline_ev - def get_tasks(self, pipeline_details: Any) -> Task: + def get_tasks(self, pipeline_details: Any) -> List[Task]: task_list = [] for task in pipeline_details["Graph"]["Nodes"]: self.task_id_mapping[task["UniqueId"]] = task["Name"][:128] diff --git a/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py b/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py index ab46754b014..7266bebaa32 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py @@ -62,7 +62,8 @@ class PipelineServiceTopology(ServiceTopology): NodeStage( type_=PipelineService, context="pipeline_service", - processor="yield_pipeline_service", + processor="yield_create_request_pipeline_service", + overwrite=False, ), ], children=["pipeline"], @@ -171,7 +172,6 @@ class PipelineServiceSource(TopologyRunnerMixin, Source, ABC): topology = PipelineServiceTopology() context = create_source_context(topology) - @abstractmethod def __init__( self, config: WorkflowSource, @@ -200,7 +200,7 @@ class PipelineServiceSource(TopologyRunnerMixin, Source, ABC): def get_services(self) -> Iterable[WorkflowSource]: yield self.config - def yield_pipeline_service(self, config: WorkflowSource): + def yield_create_request_pipeline_service(self, config: WorkflowSource): yield self.metadata.get_create_service_from_source( entity=PipelineService, config=config ) diff --git a/ingestion/src/metadata/orm_profiler/api/workflow.py b/ingestion/src/metadata/orm_profiler/api/workflow.py index 72186b98711..65a07daf158 100644 --- a/ingestion/src/metadata/orm_profiler/api/workflow.py +++ b/ingestion/src/metadata/orm_profiler/api/workflow.py @@ -30,6 +30,7 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata OpenMetadataConnection, ) from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.generated.schema.entity.services.serviceType import ServiceType from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import ( DatabaseServiceProfilerPipeline, ) @@ -44,6 +45,10 @@ from metadata.orm_profiler.api.models import ProfilerProcessorConfig, ProfilerRe from metadata.orm_profiler.interfaces.interface_protocol import InterfaceProtocol from metadata.orm_profiler.interfaces.sqa_profiler_interface import SQAProfilerInterface from metadata.utils import fqn +from metadata.utils.class_helper import ( + get_service_class_from_service_type, + get_service_type_from_source_type, +) from metadata.utils.connections import get_connection, test_connection from metadata.utils.filters import filter_by_fqn from metadata.utils.logger import profiler_logger @@ -67,6 +72,11 @@ class ProfilerWorkflow: self.config.workflowConfig.openMetadataServerConfig ) + # OpenMetadata client to fetch tables + self.metadata = OpenMetadata(self.metadata_config) + + self._retrieve_service_connection_if_needed() + # Prepare the connection to the source service # We don't need the whole Source class, as it is the OM Server @@ -86,9 +96,6 @@ class ProfilerWorkflow: _from="orm_profiler", ) - # OpenMetadata client to fetch tables - self.metadata = OpenMetadata(self.metadata_config) - if not self._validate_service_name(): raise ValueError( f"Service name `{self.config.source.serviceName}` does not exist. " @@ -302,3 +309,30 @@ class ProfilerWorkflow: """ self.metadata.close() self.processor.close() + + def _retrieve_service_connection_if_needed(self) -> None: + """ + We override the current `serviceConnection` source config object if source workflow service already exists + in OM. When it is configured, we retrieve the service connection from the secrets' manager. Otherwise, we get it + from the service object itself through the default `SecretsManager`. + + :return: + """ + if not self._is_sample_source(self.config.source.type): + service_type: ServiceType = get_service_type_from_source_type( + self.config.source.type + ) + service = self.metadata.get_by_name( + get_service_class_from_service_type(service_type), + self.config.source.serviceName, + ) + if service: + self.config.source.serviceConnection = ( + self.metadata.secrets_manager_client.retrieve_service_connection( + service, service_type.name.lower() + ) + ) + + @staticmethod + def _is_sample_source(service_type): + return service_type == "sample-data" or service_type == "sample-usage" diff --git a/ingestion/src/metadata/utils/class_helper.py b/ingestion/src/metadata/utils/class_helper.py new file mode 100644 index 00000000000..99d16e176df --- /dev/null +++ b/ingestion/src/metadata/utils/class_helper.py @@ -0,0 +1,38 @@ +from pydoc import locate +from typing import Type + +from pydantic import BaseModel + +from metadata.generated.schema.entity.services.serviceType import ServiceType + + +def _clean(source_type: str): + source_type = source_type.replace("-", "_") + source_type = source_type.replace("_usage", "") + source_type = source_type.replace("_", "") + if source_type == "sample": + source_type = "sampledata" + return source_type + + +def _get_service_type_from(service_subtype: str) -> ServiceType: + + for service_type in ServiceType: + if service_subtype.lower() in [ + subtype.value.lower() + for subtype in locate( + f"metadata.generated.schema.entity.services.{service_type.name.lower()}Service.{service_type.name}ServiceType" + ) + or [] + ]: + return service_type + + +def get_service_type_from_source_type(source_type: str) -> ServiceType: + return _get_service_type_from(_clean(source_type)) + + +def get_service_class_from_service_type(service_type: ServiceType) -> Type[BaseModel]: + return locate( + f"metadata.generated.schema.entity.services.{service_type.name.lower()}Service.{service_type.name}Service" + ) diff --git a/ingestion/src/metadata/utils/helpers.py b/ingestion/src/metadata/utils/helpers.py index 651251739bd..ddd5651b3ed 100644 --- a/ingestion/src/metadata/utils/helpers.py +++ b/ingestion/src/metadata/utils/helpers.py @@ -8,6 +8,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import re from datetime import datetime, timedelta from typing import Any, Dict, Iterable, List, Optional diff --git a/ingestion/src/metadata/utils/s3_utils.py b/ingestion/src/metadata/utils/s3_utils.py index 7897a833c12..2180cbf6731 100644 --- a/ingestion/src/metadata/utils/s3_utils.py +++ b/ingestion/src/metadata/utils/s3_utils.py @@ -8,9 +8,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import json import os -from itertools import islice from typing import Any import pandas as pd diff --git a/ingestion/src/metadata/utils/secrets_manager.py b/ingestion/src/metadata/utils/secrets_manager.py index c39e0b01af9..30fab119bed 100644 --- a/ingestion/src/metadata/utils/secrets_manager.py +++ b/ingestion/src/metadata/utils/secrets_manager.py @@ -8,6 +8,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import inspect import json from abc import abstractmethod @@ -25,6 +26,12 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata from metadata.generated.schema.entity.services.connections.serviceConnection import ( ServiceConnection, ) +from metadata.generated.schema.entity.services.dashboardService import DashboardService +from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.generated.schema.entity.services.messagingService import MessagingService +from metadata.generated.schema.entity.services.metadataService import MetadataService +from metadata.generated.schema.entity.services.mlmodelService import MlModelService +from metadata.generated.schema.entity.services.pipelineService import PipelineService from metadata.generated.schema.security.client import ( auth0SSOClientConfig, azureSSOClientConfig, @@ -42,8 +49,16 @@ logger = ingestion_logger() SECRET_MANAGER_AIRFLOW_CONF = "openmetadata_secrets_manager" # new typing type wrapping types from the '__root__' field of 'ServiceConnection' class -ServiceConnectionType = NewType( - "ServiceConnectionType", ServiceConnection.__fields__["__root__"].type_ +ServiceWithConnectionType = NewType( + "ServiceWithConnectionType", + Union[ + DashboardService, + DatabaseService, + MessagingService, + MetadataService, + MlModelService, + PipelineService, + ], ) # new typing type wrapping types from the 'securityConfig' field of 'OpenMetadataConnection' class @@ -70,13 +85,13 @@ class SecretsManager(metaclass=Singleton): """ @abstractmethod - def add_service_config_connection( + def retrieve_service_connection( self, - service: ServiceConnectionType, + service: ServiceWithConnectionType, service_type: str, - ) -> None: + ) -> ServiceConnection: """ - Add the service connection config from the secret manager to a given service connection object. + Retrieve the service connection from the secret manager to a given service connection object. :param service: Service connection object e.g. DatabaseConnection :param service_type: Service type e.g. databaseService """ @@ -90,15 +105,6 @@ class SecretsManager(metaclass=Singleton): """ pass - @staticmethod - def to_service_simple(service_type: str) -> str: - """ - Return the service simple name. - :param service_type: Service type e.g. databaseService - :return: - """ - return service_type.replace("Service", "").lower() - @staticmethod def build_secret_id(*args: str) -> str: """ @@ -111,7 +117,8 @@ class SecretsManager(metaclass=Singleton): secret_suffix = "-".join([arg.lower() for arg in args]) return f"openmetadata-{secret_suffix}" - def get_service_connection_class(self, service_type: str) -> object: + @staticmethod + def get_service_connection_class(service_type: str) -> object: """ Returns the located service object by dotted path, importing as necessary. :param service_type: Service type e.g. databaseService @@ -121,20 +128,20 @@ class SecretsManager(metaclass=Singleton): ( clazz[1] for clazz in inspect.getmembers( - locate(f"metadata.generated.schema.entity.services.{service_type}"), + locate( + f"metadata.generated.schema.entity.services.{service_type}Service" + ), inspect.isclass, ) - if clazz[0].lower() - == f"{self.to_service_simple(service_type)}connection" + if clazz[0].lower() == f"{service_type}connection" ) ).__name__ return locate( - f"metadata.generated.schema.entity.services.{service_type}.{service_conn_name}" + f"metadata.generated.schema.entity.services.{service_type}Service.{service_conn_name}" ) - def get_connection_class( - self, service_type: str, service_connection_type: str - ) -> object: + @staticmethod + def get_connection_class(service_type: str, service_connection_type: str) -> object: """ Returns the located connection object by dotted path, importing as necessary. :param service_type: Service type e.g. databaseService @@ -145,7 +152,7 @@ class SecretsManager(metaclass=Singleton): service_connection_type[0].lower() + service_connection_type[1:] ) return locate( - f"metadata.generated.schema.entity.services.connections.{self.to_service_simple(service_type)}.{connection_py_file}Connection.{service_connection_type}Connection" + f"metadata.generated.schema.entity.services.connections.{service_type}.{connection_py_file}Connection.{service_connection_type}Connection" ) @@ -162,15 +169,15 @@ class LocalSecretsManager(SecretsManager): """ pass - def add_service_config_connection( + def retrieve_service_connection( self, - service: ServiceConnectionType, + service: ServiceWithConnectionType, service_type: str, - ) -> None: + ) -> ServiceConnection: """ The LocalSecretsManager does not modify the ServiceConnection object """ - pass + return ServiceConnection(__root__=service.connection) class AWSSecretsManager(SecretsManager): @@ -182,25 +189,26 @@ class AWSSecretsManager(SecretsManager): ) self.secretsmanager_client = session.client("secretsmanager") - def add_service_config_connection( + def retrieve_service_connection( self, - service: ServiceConnectionType, + service: ServiceWithConnectionType, service_type: str, - ) -> None: + ) -> ServiceConnection: service_connection_type = service.serviceType.value service_name = service.name.__root__ secret_id = self.build_secret_id( - self.to_service_simple(service_type), service_connection_type, service_name + service_type, service_connection_type, service_name ) connection_class = self.get_connection_class( service_type, service_connection_type ) service_conn_class = self.get_service_connection_class(service_type) - service.connection = service_conn_class( + service_connection = service_conn_class( config=connection_class.parse_obj( json.loads(self._get_string_value(secret_id)) ) ) + return ServiceConnection(__root__=service_connection) def add_auth_provider_security_config(self, config: OpenMetadataConnection) -> None: if config.authProvider == AuthProvider.no_auth: @@ -231,7 +239,7 @@ class AWSSecretsManager(SecretsManager): try: kwargs = {"SecretId": name} response = self.secretsmanager_client.get_secret_value(**kwargs) - logger.info("Got value for secret %s.", name) + logger.debug("Got value for secret %s.", name) except ClientError: logger.exception("Couldn't get value for secret %s.", name) raise diff --git a/ingestion/src/metadata/utils/source_connections.py b/ingestion/src/metadata/utils/source_connections.py index a169bc5e7a5..e4f4c25f112 100644 --- a/ingestion/src/metadata/utils/source_connections.py +++ b/ingestion/src/metadata/utils/source_connections.py @@ -9,7 +9,6 @@ # See the License for the specific language governing permissions and # limitations under the License. - """ Hosts the singledispatch to build source URLs """ diff --git a/ingestion/tests/integration/ometa/test_ometa_secrets_manager.py b/ingestion/tests/integration/ometa/test_ometa_secrets_manager.py index 6e55026b1db..d6c593a793f 100644 --- a/ingestion/tests/integration/ometa/test_ometa_secrets_manager.py +++ b/ingestion/tests/integration/ometa/test_ometa_secrets_manager.py @@ -8,6 +8,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + from copy import copy from unittest import TestCase from unittest.mock import Mock, patch diff --git a/ingestion/tests/unit/metadata/utils/test_class_helper.py b/ingestion/tests/unit/metadata/utils/test_class_helper.py new file mode 100644 index 00000000000..6b3a9caf34f --- /dev/null +++ b/ingestion/tests/unit/metadata/utils/test_class_helper.py @@ -0,0 +1,52 @@ +import pytest + +from metadata.generated.schema.entity.services.dashboardService import DashboardService +from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.generated.schema.entity.services.messagingService import MessagingService +from metadata.generated.schema.entity.services.metadataService import MetadataService +from metadata.generated.schema.entity.services.mlmodelService import MlModelService +from metadata.generated.schema.entity.services.pipelineService import PipelineService +from metadata.generated.schema.entity.services.serviceType import ServiceType +from metadata.utils.class_helper import ( + get_service_class_from_service_type, + get_service_type_from_source_type, +) + + +@pytest.mark.parametrize( + ("source_type", "expected_service_type"), + [ + ("looker", ServiceType.Dashboard), + ("mysql", ServiceType.Database), + ("kafka", ServiceType.Messaging), + ("amundsen", ServiceType.Metadata), + ("mlflow", ServiceType.MlModel), + ("airflow", ServiceType.Pipeline), + ("clickhouse_usage", ServiceType.Database), + ("sample-data", ServiceType.Database), + ("redshift-usage", ServiceType.Database), + ], +) +def test_get_service_type_from_source_type( + source_type: str, expected_service_type: ServiceType +): + actual_service_type = get_service_type_from_source_type(source_type) + assert actual_service_type == expected_service_type + + +@pytest.mark.parametrize( + ("service_type", "expected_service_class"), + [ + (ServiceType.Dashboard, DashboardService), + (ServiceType.Database, DatabaseService), + (ServiceType.Messaging, MessagingService), + (ServiceType.Metadata, MetadataService), + (ServiceType.MlModel, MlModelService), + (ServiceType.Pipeline, PipelineService), + ], +) +def test_get_service_class_from_service_type( + service_type: ServiceType, expected_service_class: object +): + actual_service_class = get_service_class_from_service_type(service_type) + assert actual_service_class == expected_service_class diff --git a/ingestion/tests/unit/metadata/utils/test_secrets_manager.py b/ingestion/tests/unit/metadata/utils/test_secrets_manager.py index 14127c339c5..6e37b57363f 100644 --- a/ingestion/tests/unit/metadata/utils/test_secrets_manager.py +++ b/ingestion/tests/unit/metadata/utils/test_secrets_manager.py @@ -27,6 +27,9 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata OpenMetadataConnection, SecretsManagerProvider, ) +from metadata.generated.schema.entity.services.connections.serviceConnection import ( + ServiceConnection, +) from metadata.generated.schema.entity.services.databaseService import ( DatabaseConnection, DatabaseService, @@ -55,8 +58,9 @@ AUTH_PROVIDER_CONFIG = {"secretKey": "/fake/path"} class TestSecretsManager(TestCase): - service_type: str = "databaseService" + service_type: str = "database" service: DatabaseService + service_connection: ServiceConnection database_connection = MysqlConnection(**DATABASE_CONNECTION) auth_provider_config = GoogleSSOClientConfig(**AUTH_PROVIDER_CONFIG) om_connection: OpenMetadataConnection @@ -64,6 +68,8 @@ class TestSecretsManager(TestCase): @classmethod def setUpClass(cls) -> None: cls.service = DatabaseService(**DATABASE_SERVICE) + cls.service.connection = DatabaseConnection(config=cls.database_connection) + cls.service_connection = ServiceConnection(__root__=cls.service.connection) cls.om_connection = OpenMetadataConnection( authProvider=AuthProvider.google, hostPort="http://localhost:8585/api", @@ -75,13 +81,16 @@ class TestSecretsManager(TestCase): def test_local_manager_add_service_config_connection(self): local_manager = get_secrets_manager(SecretsManagerProvider.local, None) - self.service.connection.config = self.database_connection - expected_service = deepcopy(self.service) + expected_service_connection = self.service_connection - local_manager.add_service_config_connection(self.service, self.service_type) + actual_service_connection: ServiceConnection = ( + local_manager.retrieve_service_connection(self.service, self.service_type) + ) - self.assertEqual(expected_service, self.service) - assert id(self.database_connection) == id(self.service.connection.config) + self.assertEqual(actual_service_connection, expected_service_connection) + assert id(actual_service_connection.__root__.config) == id( + expected_service_connection.__root__.config + ) def test_local_manager_add_auth_provider_security_config(self): local_manager = get_secrets_manager(SecretsManagerProvider.local, None) @@ -98,21 +107,23 @@ class TestSecretsManager(TestCase): aws_manager = self._build_secret_manager( boto3_mock, {"SecretString": json.dumps(DATABASE_CONNECTION)} ) - expected_service = deepcopy(self.service) - expected_service.connection.config = self.database_connection - self.service.connection = None + expected_service_connection = self.service_connection - aws_manager.add_service_config_connection(self.service, self.service_type) + actual_service_connection: ServiceConnection = ( + aws_manager.retrieve_service_connection(self.service, self.service_type) + ) - self.assertEqual(expected_service, self.service) - assert id(self.database_connection) != id(self.service.connection.config) + self.assertEqual(expected_service_connection, actual_service_connection) + assert id(actual_service_connection.__root__.config) != id( + expected_service_connection.__root__.config + ) @patch("metadata.utils.secrets_manager.boto3") def test_aws_manager_fails_add_auth_provider_security_config(self, mocked_boto3): aws_manager = self._build_secret_manager(mocked_boto3, {}) with self.assertRaises(ValueError) as value_error: - aws_manager.add_service_config_connection(self.service, self.service_type) + aws_manager.retrieve_service_connection(self.service, self.service_type) self.assertEqual( "[SecretString] not present in the response.", value_error.exception ) @@ -137,7 +148,7 @@ class TestSecretsManager(TestCase): aws_manager = self._build_secret_manager(mocked_boto3, {}) with self.assertRaises(ValueError) as value_error: - aws_manager.add_service_config_connection(self.service, self.service_type) + aws_manager.retrieve_service_connection(self.service, self.service_type) self.assertEqual( "[SecretString] not present in the response.", value_error.exception ) diff --git a/ingestion/tests/unit/test_source_connection.py b/ingestion/tests/unit/test_source_connection.py index 7351420db2b..2445c3f43b0 100644 --- a/ingestion/tests/unit/test_source_connection.py +++ b/ingestion/tests/unit/test_source_connection.py @@ -9,7 +9,6 @@ # See the License for the specific language governing permissions and # limitations under the License. - from unittest import TestCase from metadata.generated.schema.entity.services.connections.database.athenaConnection import ( diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py index 65623b04468..48fdae8af85 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py @@ -99,8 +99,6 @@ def build_source(ingestion_pipeline: IngestionPipeline) -> WorkflowSource: if not service: raise ValueError(f"Could not get service from type {service_type}") - metadata.secrets_manager_client.add_service_config_connection(service, service_type) - return WorkflowSource( type=service.serviceType.value.lower(), serviceName=service.name.__root__, diff --git a/openmetadata-core/src/main/resources/json/schema/entity/services/serviceType.json b/openmetadata-core/src/main/resources/json/schema/entity/services/serviceType.json new file mode 100644 index 00000000000..7097df31a88 --- /dev/null +++ b/openmetadata-core/src/main/resources/json/schema/entity/services/serviceType.json @@ -0,0 +1,10 @@ +{ + "$id": "https://open-metadata.org/schema/entity/services/serviceType.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Service Type", + "description": "This schema defines the service types entities which requires a connection.", + "type": "string", + "javaType": "org.openmetadata.catalog.entity.services.ServiceType", + "enum": ["Dashboard", "Database", "Messaging", "Metadata", "MlModel", "Pipeline"], + "additionalProperties": false +} \ No newline at end of file diff --git a/openmetadata-core/src/main/resources/json/schema/metadataIngestion/workflow.json b/openmetadata-core/src/main/resources/json/schema/metadataIngestion/workflow.json index 134167555bd..955f38c7b90 100644 --- a/openmetadata-core/src/main/resources/json/schema/metadataIngestion/workflow.json +++ b/openmetadata-core/src/main/resources/json/schema/metadataIngestion/workflow.json @@ -66,7 +66,7 @@ } }, "additionalProperties": false, - "required": ["type", "serviceName", "serviceConnection", "sourceConfig"] + "required": ["type", "serviceName", "sourceConfig"] }, "processor": { "description": "Configuration for Processor Component in the OpenMetadata Ingestion Framework.",