From 2507b7006a0f7de99ce865022d88d5bb1bc057d3 Mon Sep 17 00:00:00 2001 From: Teddy Crepineau Date: Fri, 26 Nov 2021 16:16:34 +0100 Subject: [PATCH] issue-1049: Refactor code to use functions from utils.helper --- .../metadata/ingestion/source/sample_data.py | 77 ++++++------------- .../ingestion/source/sample_entity.py | 1 - .../metadata/ingestion/source/sample_usage.py | 7 +- 3 files changed, 27 insertions(+), 58 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/sample_data.py b/ingestion/src/metadata/ingestion/source/sample_data.py index 8093e788245..8bd7ffb96c0 100644 --- a/ingestion/src/metadata/ingestion/source/sample_data.py +++ b/ingestion/src/metadata/ingestion/source/sample_data.py @@ -58,7 +58,12 @@ from metadata.ingestion.models.table_metadata import Chart, Dashboard from metadata.ingestion.models.user import User from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig -from metadata.utils.helpers import get_database_service_or_create +from metadata.utils.helpers import ( + get_database_service_or_create, + get_messaging_service_or_create, + get_dashboard_service_or_create, + get_pipeline_service_or_create, +) logger: logging.Logger = logging.getLogger(__name__) @@ -75,54 +80,6 @@ class InvalidSampleDataException(Exception): """ -def get_database_service_or_create(service_json, metadata_config) -> DatabaseService: - metadata = OpenMetadata(metadata_config) - service = metadata.get_by_name(entity=DatabaseService, fqdn=service_json["name"]) - if service is not None: - return service - else: - created_service = metadata.create_or_update( - CreateDatabaseServiceEntityRequest(**service_json) - ) - return created_service - - -def get_messaging_service_or_create(service_json, metadata_config) -> MessagingService: - metadata = OpenMetadata(metadata_config) - service = metadata.get_by_name(entity=MessagingService, fqdn=service_json["name"]) - if service is not None: - return service - else: - created_service = metadata.create_or_update( - CreateMessagingServiceEntityRequest(**service_json) - ) - return created_service - - -def get_dashboard_service_or_create(service_json, metadata_config) -> DashboardService: - metadata = OpenMetadata(metadata_config) - service = metadata.get_by_name(entity=DashboardService, fqdn=service_json["name"]) - if service is not None: - return service - else: - created_service = metadata.create_or_update( - CreateDashboardServiceEntityRequest(**service_json) - ) - return created_service - - -def get_pipeline_service_or_create(service_json, metadata_config) -> PipelineService: - metadata = OpenMetadata(metadata_config) - service = metadata.get_by_name(entity=PipelineService, fqdn=service_json["name"]) - if service is not None: - return service - else: - created_service = metadata.create_or_update( - CreatePipelineServiceEntityRequest(**service_json) - ) - return created_service - - def get_lineage_entity_ref(edge, metadata_config) -> EntityReference: metadata = OpenMetadata(metadata_config) fqn = edge["fqn"] @@ -152,10 +109,14 @@ class SampleDataSourceConfig(ConfigModel): database: str = "warehouse" service_type: str = "BigQuery" scheme = "bigquery+pymysql" + host_port = "9999" def get_sample_data_folder(self): return self.sample_data_folder + def get_service_type(self): + return self.service_type + @dataclass class SampleDataSourceStatus(SourceStatus): @@ -288,7 +249,7 @@ class SampleDataSource(Source): open(self.config.sample_data_folder + "/datasets/tables.json", "r") ) self.database_service = get_database_service_or_create( - self.database_service_json, self.metadata_config + config, self.metadata_config ) self.kafka_service_json = json.load( open(self.config.sample_data_folder + "/topics/service.json", "r") @@ -297,7 +258,11 @@ class SampleDataSource(Source): open(self.config.sample_data_folder + "/topics/topics.json", "r") ) self.kafka_service = get_messaging_service_or_create( - self.kafka_service_json, self.metadata_config + self.kafka_service_json.get("name"), + self.kafka_service_json.get("serviceType"), + self.kafka_service_json.get("schemaRegistry"), + self.kafka_service_json.get("brokers"), + self.metadata_config, ) self.dashboard_service_json = json.load( open(self.config.sample_data_folder + "/dashboards/service.json", "r") @@ -309,7 +274,12 @@ class SampleDataSource(Source): open(self.config.sample_data_folder + "/dashboards/dashboards.json", "r") ) self.dashboard_service = get_dashboard_service_or_create( - self.dashboard_service_json, metadata_config + self.dashboard_service_json.get("name"), + self.dashboard_service_json.get("serviceType"), + self.dashboard_service_json.get("username"), + self.dashboard_service_json.get("password"), + self.dashboard_service_json.get("dashboardUrl"), + metadata_config, ) self.pipeline_service_json = json.load( open(self.config.sample_data_folder + "/pipelines/service.json", "r") @@ -318,7 +288,8 @@ class SampleDataSource(Source): open(self.config.sample_data_folder + "/pipelines/pipelines.json", "r") ) self.pipeline_service = get_pipeline_service_or_create( - self.pipeline_service_json, metadata_config + SampleDataSourceConfig.parse_obj(self.pipeline_service_json), + metadata_config, ) self.lineage = json.load( open(self.config.sample_data_folder + "/lineage/lineage.json", "r") diff --git a/ingestion/src/metadata/ingestion/source/sample_entity.py b/ingestion/src/metadata/ingestion/source/sample_entity.py index a513754870b..606ba2ef376 100644 --- a/ingestion/src/metadata/ingestion/source/sample_entity.py +++ b/ingestion/src/metadata/ingestion/source/sample_entity.py @@ -30,7 +30,6 @@ from metadata.ingestion.ometa.client import APIError from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig from metadata.ingestion.processor.pii import ColumnNameScanner -from metadata.ingestion.source.sample_data import get_database_service_or_create from metadata.ingestion.source.sql_source import SQLConnectionConfig from metadata.utils.helpers import snake_to_camel diff --git a/ingestion/src/metadata/ingestion/source/sample_usage.py b/ingestion/src/metadata/ingestion/source/sample_usage.py index 1191820cbf9..643bd9fda48 100644 --- a/ingestion/src/metadata/ingestion/source/sample_usage.py +++ b/ingestion/src/metadata/ingestion/source/sample_usage.py @@ -5,12 +5,11 @@ from typing import Iterable from metadata.ingestion.api.source import Source from metadata.ingestion.models.table_queries import TableQuery - -from ..ometa.openmetadata_rest import MetadataServerConfig -from .sample_data import ( +from metadata.utils.helper import get_database_service_or_create +from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig +from metadata.ingestion.source.sample_data import ( SampleDataSourceConfig, SampleDataSourceStatus, - get_database_service_or_create, )