From d783869ed6337376c1a3ac843685efc8f0d29a15 Mon Sep 17 00:00:00 2001 From: NiharDoshi99 <51595473+NiharDoshi99@users.noreply.github.com> Date: Wed, 16 Nov 2022 11:11:54 +0530 Subject: [PATCH] Fix: Added metadata service (#8759) * Fix: Added metadata service * Fix as per comment * Fix changes for py-test * Fix changes for py-test * Fix py-checkstyle --- .../metadata/clients/connection_clients.py | 6 ++++ .../ingestion/source/metadata/amundsen.py | 25 +++++--------- ingestion/src/metadata/utils/connections.py | 34 +++++++++++++++++++ .../workflows/ingestion/common.py | 6 ++++ .../testServiceConnection.json | 8 ++++- 5 files changed, 61 insertions(+), 18 deletions(-) diff --git a/ingestion/src/metadata/clients/connection_clients.py b/ingestion/src/metadata/clients/connection_clients.py index 28d2fc36739..899b0855951 100644 --- a/ingestion/src/metadata/clients/connection_clients.py +++ b/ingestion/src/metadata/clients/connection_clients.py @@ -157,3 +157,9 @@ class DomoClient: class NifiClientWrapper: def __init__(self, client) -> None: self.client = client + + +@dataclass +class AmundsenClient: + def __init__(self, client) -> None: + self.client = client diff --git a/ingestion/src/metadata/ingestion/source/metadata/amundsen.py b/ingestion/src/metadata/ingestion/source/metadata/amundsen.py index 6974ee13914..a5e6e2e42b0 100644 --- a/ingestion/src/metadata/ingestion/source/metadata/amundsen.py +++ b/ingestion/src/metadata/ingestion/source/metadata/amundsen.py @@ -19,7 +19,6 @@ from typing import Iterable, List, Optional from pydantic import SecretStr from sqlalchemy.engine.url import make_url -from metadata.clients.neo4j_client import Neo4JConfig, Neo4jHelper from metadata.config.common import ConfigModel from metadata.generated.schema.api.data.createChart import CreateChartRequest from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest @@ -68,6 +67,7 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser from metadata.utils import fqn from metadata.utils.amundsen_helper import SERVICE_TYPE_MAPPER +from metadata.utils.connections import get_connection from metadata.utils.helpers import get_standard_chart_type from metadata.utils.logger import ingestion_logger from metadata.utils.sql_queries import ( @@ -135,15 +135,8 @@ class AmundsenSource(Source[Entity]): self.database_object = None self.metadata = OpenMetadata(self.metadata_config) self.service_connection = self.config.serviceConnection.__root__.config - neo4j_config = Neo4JConfig( - username=self.service_connection.username, - password=self.service_connection.password.get_secret_value(), - neo4j_url=self.service_connection.hostPort, - max_connection_life_time=self.service_connection.maxConnectionLifeTime, - neo4j_encrypted=self.service_connection.encrypted, - neo4j_validate_ssl=self.service_connection.validateSSL, - ) - self.neo4j_helper = Neo4jHelper(neo4j_config) + self.connection = get_connection(self.service_connection) + self.client = self.connection.client self.status = AmundsenStatus() self.database_service_map = { service.value.lower(): service.value for service in DatabaseServiceType @@ -164,18 +157,16 @@ class AmundsenSource(Source[Entity]): pass def next_record(self) -> Iterable[Entity]: - table_entities = self.neo4j_helper.execute_query(NEO4J_AMUNDSEN_TABLE_QUERY) + table_entities = self.client.execute_query(NEO4J_AMUNDSEN_TABLE_QUERY) for table in table_entities: yield from self.create_table_entity(table) - user_entities = self.neo4j_helper.execute_query(NEO4J_AMUNDSEN_USER_QUERY) + user_entities = self.client.execute_query(NEO4J_AMUNDSEN_USER_QUERY) for user in user_entities: yield from self.create_user_entity(user) yield from self.add_owner_to_entity(user) - dashboard_entities = self.neo4j_helper.execute_query( - NEO4J_AMUNDSEN_DASHBOARD_QUERY - ) + dashboard_entities = self.client.execute_query(NEO4J_AMUNDSEN_DASHBOARD_QUERY) for dashboard in dashboard_entities: yield from self.create_dashboard_service(dashboard) yield from self.create_chart_entity(dashboard) @@ -486,8 +477,8 @@ class AmundsenSource(Source[Entity]): yield chart def close(self): - if self.neo4j_helper is not None: - self.neo4j_helper.close() + if self.client is not None: + self.client.close() def get_status(self) -> SourceStatus: return self.status diff --git a/ingestion/src/metadata/utils/connections.py b/ingestion/src/metadata/utils/connections.py index 97648929eff..1040a0791ae 100644 --- a/ingestion/src/metadata/utils/connections.py +++ b/ingestion/src/metadata/utils/connections.py @@ -31,6 +31,7 @@ from sqlalchemy.pool import QueuePool from metadata.clients.connection_clients import ( AirByteClient, + AmundsenClient, DagsterClient, DatalakeClient, DeltaLakeClient, @@ -123,6 +124,9 @@ from metadata.generated.schema.entity.services.connections.messaging.kinesisConn from metadata.generated.schema.entity.services.connections.messaging.redpandaConnection import ( RedpandaConnection, ) +from metadata.generated.schema.entity.services.connections.metadata.amundsenConnection import ( + AmundsenConnection, +) from metadata.generated.schema.entity.services.connections.mlmodel.mlflowConnection import ( MlflowConnection, ) @@ -163,6 +167,7 @@ from metadata.utils.source_connections import ( singledispatch_with_options_secrets, update_connection_opts_args, ) +from metadata.utils.sql_queries import NEO4J_AMUNDSEN_USER_QUERY from metadata.utils.timeout import timeout logger = logging.getLogger("Utils") @@ -1237,3 +1242,32 @@ def _(connection: DomoClient) -> None: except Exception as exc: msg = f"Unknown error connecting with {connection}: {exc}." raise SourceConnectionException(msg) + + +@get_connection.register +def _(connection: AmundsenConnection) -> AmundsenClient: + + from metadata.clients.neo4j_client import Neo4JConfig, Neo4jHelper + + try: + neo4j_config = Neo4JConfig( + username=connection.username, + password=connection.password.get_secret_value(), + neo4j_url=connection.hostPort, + max_connection_life_time=connection.maxConnectionLifeTime, + neo4j_encrypted=connection.encrypted, + neo4j_validate_ssl=connection.validateSSL, + ) + return AmundsenClient(Neo4jHelper(neo4j_config)) + except Exception as exc: + msg = f"Unknown error connecting with {connection}: {exc}." + raise SourceConnectionException(msg) + + +@test_connection.register +def _(connection: AmundsenClient) -> None: + try: + connection.client.execute_query(query=NEO4J_AMUNDSEN_USER_QUERY) + except Exception as exc: + msg = f"Unknown error connecting with {connection}: {exc}." + raise SourceConnectionException(msg) diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py index 3d83ef47b8d..8d1227a3faa 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py @@ -25,6 +25,7 @@ from metadata.data_insight.api.workflow import DataInsightWorkflow from metadata.generated.schema.entity.services.dashboardService import DashboardService from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.entity.services.messagingService import MessagingService +from metadata.generated.schema.entity.services.metadataService import MetadataService from metadata.generated.schema.entity.services.mlmodelService import MlModelService from metadata.generated.schema.entity.services.pipelineService import PipelineService from metadata.generated.schema.tests.testSuite import TestSuite @@ -146,6 +147,11 @@ def build_source(ingestion_pipeline: IngestionPipeline) -> WorkflowSource: service: MlModelService = metadata.get_by_name( entity=entity_class, fqn=ingestion_pipeline.service.name ) + elif service_type == "metadataService": + entity_class = MetadataService + service: MetadataService = metadata.get_by_name( + entity=entity_class, fqn=ingestion_pipeline.service.name + ) else: raise InvalidServiceException(f"Invalid Service Type: {service_type}") except ValidationError as original_error: diff --git a/openmetadata-spec/src/main/resources/json/schema/api/services/ingestionPipelines/testServiceConnection.json b/openmetadata-spec/src/main/resources/json/schema/api/services/ingestionPipelines/testServiceConnection.json index 2fe28b9d7e6..b7d972e8808 100644 --- a/openmetadata-spec/src/main/resources/json/schema/api/services/ingestionPipelines/testServiceConnection.json +++ b/openmetadata-spec/src/main/resources/json/schema/api/services/ingestionPipelines/testServiceConnection.json @@ -22,13 +22,16 @@ }, { "$ref": "../../../entity/services/mlmodelService.json#/definitions/mlModelConnection" + }, + { + "$ref": "../../../entity/services/metadataService.json#/definitions/metadataConnection" } ] }, "connectionType": { "description": "Type of service such as Database, Dashboard, Messaging, etc.", "type": "string", - "enum": ["Database", "Dashboard", "Messaging", "Pipeline", "MlModel"], + "enum": ["Database", "Dashboard", "Messaging", "Pipeline", "MlModel", "Metadata"], "javaEnums": [ { "name": "Database" @@ -44,6 +47,9 @@ }, { "name": "MlModel" + }, + { + "name": "Metadata" } ] },