Merge pull request #1409 from TeddyCr/issue-1049

issue-1049: Refactor code to use functions from utils.helper
This commit is contained in:
Pere Miquel Brull 2021-11-26 18:51:01 +01:00 committed by GitHub
commit fdd6ca14b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 27 additions and 58 deletions

View File

@ -58,7 +58,12 @@ from metadata.ingestion.models.table_metadata import Chart, Dashboard
from metadata.ingestion.models.user import User
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.utils.helpers import get_database_service_or_create
from metadata.utils.helpers import (
get_database_service_or_create,
get_messaging_service_or_create,
get_dashboard_service_or_create,
get_pipeline_service_or_create,
)
logger: logging.Logger = logging.getLogger(__name__)
@ -75,54 +80,6 @@ class InvalidSampleDataException(Exception):
"""
def get_database_service_or_create(service_json, metadata_config) -> DatabaseService:
metadata = OpenMetadata(metadata_config)
service = metadata.get_by_name(entity=DatabaseService, fqdn=service_json["name"])
if service is not None:
return service
else:
created_service = metadata.create_or_update(
CreateDatabaseServiceEntityRequest(**service_json)
)
return created_service
def get_messaging_service_or_create(service_json, metadata_config) -> MessagingService:
metadata = OpenMetadata(metadata_config)
service = metadata.get_by_name(entity=MessagingService, fqdn=service_json["name"])
if service is not None:
return service
else:
created_service = metadata.create_or_update(
CreateMessagingServiceEntityRequest(**service_json)
)
return created_service
def get_dashboard_service_or_create(service_json, metadata_config) -> DashboardService:
metadata = OpenMetadata(metadata_config)
service = metadata.get_by_name(entity=DashboardService, fqdn=service_json["name"])
if service is not None:
return service
else:
created_service = metadata.create_or_update(
CreateDashboardServiceEntityRequest(**service_json)
)
return created_service
def get_pipeline_service_or_create(service_json, metadata_config) -> PipelineService:
metadata = OpenMetadata(metadata_config)
service = metadata.get_by_name(entity=PipelineService, fqdn=service_json["name"])
if service is not None:
return service
else:
created_service = metadata.create_or_update(
CreatePipelineServiceEntityRequest(**service_json)
)
return created_service
def get_lineage_entity_ref(edge, metadata_config) -> EntityReference:
metadata = OpenMetadata(metadata_config)
fqn = edge["fqn"]
@ -152,10 +109,14 @@ class SampleDataSourceConfig(ConfigModel):
database: str = "warehouse"
service_type: str = "BigQuery"
scheme = "bigquery+pymysql"
host_port = "9999"
def get_sample_data_folder(self):
return self.sample_data_folder
def get_service_type(self):
return self.service_type
@dataclass
class SampleDataSourceStatus(SourceStatus):
@ -288,7 +249,7 @@ class SampleDataSource(Source):
open(self.config.sample_data_folder + "/datasets/tables.json", "r")
)
self.database_service = get_database_service_or_create(
self.database_service_json, self.metadata_config
config, self.metadata_config
)
self.kafka_service_json = json.load(
open(self.config.sample_data_folder + "/topics/service.json", "r")
@ -297,7 +258,11 @@ class SampleDataSource(Source):
open(self.config.sample_data_folder + "/topics/topics.json", "r")
)
self.kafka_service = get_messaging_service_or_create(
self.kafka_service_json, self.metadata_config
self.kafka_service_json.get("name"),
self.kafka_service_json.get("serviceType"),
self.kafka_service_json.get("schemaRegistry"),
self.kafka_service_json.get("brokers"),
self.metadata_config,
)
self.dashboard_service_json = json.load(
open(self.config.sample_data_folder + "/dashboards/service.json", "r")
@ -309,7 +274,12 @@ class SampleDataSource(Source):
open(self.config.sample_data_folder + "/dashboards/dashboards.json", "r")
)
self.dashboard_service = get_dashboard_service_or_create(
self.dashboard_service_json, metadata_config
self.dashboard_service_json.get("name"),
self.dashboard_service_json.get("serviceType"),
self.dashboard_service_json.get("username"),
self.dashboard_service_json.get("password"),
self.dashboard_service_json.get("dashboardUrl"),
metadata_config,
)
self.pipeline_service_json = json.load(
open(self.config.sample_data_folder + "/pipelines/service.json", "r")
@ -318,7 +288,8 @@ class SampleDataSource(Source):
open(self.config.sample_data_folder + "/pipelines/pipelines.json", "r")
)
self.pipeline_service = get_pipeline_service_or_create(
self.pipeline_service_json, metadata_config
SampleDataSourceConfig.parse_obj(self.pipeline_service_json),
metadata_config,
)
self.lineage = json.load(
open(self.config.sample_data_folder + "/lineage/lineage.json", "r")

View File

@ -30,7 +30,6 @@ from metadata.ingestion.ometa.client import APIError
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.ingestion.processor.pii import ColumnNameScanner
from metadata.ingestion.source.sample_data import get_database_service_or_create
from metadata.ingestion.source.sql_source import SQLConnectionConfig
from metadata.utils.helpers import snake_to_camel

View File

@ -5,12 +5,11 @@ from typing import Iterable
from metadata.ingestion.api.source import Source
from metadata.ingestion.models.table_queries import TableQuery
from ..ometa.openmetadata_rest import MetadataServerConfig
from .sample_data import (
from metadata.utils.helper import get_database_service_or_create
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.ingestion.source.sample_data import (
SampleDataSourceConfig,
SampleDataSourceStatus,
get_database_service_or_create,
)