mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2026-01-06 12:36:56 +00:00
* Parse connection JSON gracefully * Only check parsing * Format
This commit is contained in:
parent
81dc942370
commit
f3c107025a
50
ingestion/src/metadata/cli/ingest.py
Normal file
50
ingestion/src/metadata/cli/ingest.py
Normal file
@ -0,0 +1,50 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Ingest utility for the metadata CLI
|
||||
"""
|
||||
import pathlib
|
||||
import sys
|
||||
import traceback
|
||||
|
||||
import click
|
||||
from pydantic import ValidationError
|
||||
|
||||
from metadata.config.common import load_config_file
|
||||
from metadata.ingestion.api.workflow import Workflow
|
||||
from metadata.utils.logger import cli_logger
|
||||
|
||||
logger = cli_logger()
|
||||
|
||||
|
||||
def run_ingest(config_path: str) -> None:
|
||||
"""
|
||||
Run the ingestion workflow from a config path
|
||||
to a JSON file
|
||||
:param config_path: Path to load JSON config
|
||||
"""
|
||||
|
||||
config_file = pathlib.Path(config_path)
|
||||
config_dict = load_config_file(config_file)
|
||||
|
||||
try:
|
||||
logger.debug(f"Using config: {config_dict}")
|
||||
workflow = Workflow.create(config_dict)
|
||||
except ValidationError as e:
|
||||
click.echo(e, err=True)
|
||||
logger.debug(traceback.format_exc())
|
||||
sys.exit(1)
|
||||
|
||||
workflow.execute()
|
||||
workflow.stop()
|
||||
ret = workflow.print_status()
|
||||
sys.exit(ret)
|
||||
@ -13,7 +13,6 @@ import logging
|
||||
import os
|
||||
import pathlib
|
||||
import sys
|
||||
import traceback
|
||||
from typing import List, Optional, Tuple
|
||||
|
||||
import click
|
||||
@ -22,13 +21,14 @@ from pydantic import ValidationError
|
||||
from metadata.__version__ import get_metadata_version
|
||||
from metadata.cli.backup import run_backup
|
||||
from metadata.cli.docker import run_docker
|
||||
from metadata.cli.ingest import run_ingest
|
||||
from metadata.config.common import load_config_file
|
||||
from metadata.generated.schema.metadataIngestion.workflow import (
|
||||
OpenMetadataWorkflowConfig,
|
||||
)
|
||||
from metadata.ingestion.api.workflow import Workflow
|
||||
from metadata.orm_profiler.api.workflow import ProfilerWorkflow
|
||||
from metadata.utils.logger import cli_logger, ingestion_logger, set_loggers_level
|
||||
from metadata.utils.logger import cli_logger, set_loggers_level
|
||||
|
||||
logger = cli_logger()
|
||||
|
||||
@ -72,23 +72,7 @@ def ingest(config: str) -> None:
|
||||
Main command for ingesting metadata into Metadata.
|
||||
Logging is controlled via the JSON config
|
||||
"""
|
||||
config_file = pathlib.Path(config)
|
||||
workflow_config = OpenMetadataWorkflowConfig.parse_obj(
|
||||
load_config_file(config_file)
|
||||
)
|
||||
set_loggers_level(workflow_config.workflowConfig.loggerLevel.value)
|
||||
|
||||
try:
|
||||
logger.debug(f"Using config: {workflow_config}")
|
||||
workflow = Workflow.create(workflow_config)
|
||||
except ValidationError as e:
|
||||
click.echo(e, err=True)
|
||||
logger.debug(traceback.print_exc())
|
||||
sys.exit(1)
|
||||
workflow.execute()
|
||||
workflow.stop()
|
||||
ret = workflow.print_status()
|
||||
sys.exit(ret)
|
||||
run_ingest(config_path=config)
|
||||
|
||||
|
||||
@metadata.command()
|
||||
|
||||
131
ingestion/src/metadata/ingestion/api/parser.py
Normal file
131
ingestion/src/metadata/ingestion/api/parser.py
Normal file
@ -0,0 +1,131 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Helper to parse workflow configurations
|
||||
"""
|
||||
from typing import Optional, Type, TypeVar, Union
|
||||
|
||||
from pydantic import BaseModel, ValidationError
|
||||
|
||||
from metadata.generated.schema.entity.services.dashboardService import (
|
||||
DashboardConnection,
|
||||
DashboardServiceType,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.databaseService import (
|
||||
DatabaseConnection,
|
||||
DatabaseServiceType,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.messagingService import (
|
||||
MessagingConnection,
|
||||
MessagingServiceType,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.metadataService import (
|
||||
MetadataConnection,
|
||||
MetadataServiceType,
|
||||
)
|
||||
from metadata.generated.schema.metadataIngestion.workflow import (
|
||||
OpenMetadataWorkflowConfig,
|
||||
)
|
||||
from metadata.utils.logger import ingestion_logger
|
||||
|
||||
logger = ingestion_logger()
|
||||
|
||||
T = TypeVar("T", bound=BaseModel)
|
||||
|
||||
|
||||
def get_service_type(
|
||||
source_type: str,
|
||||
) -> Union[
|
||||
Type[DashboardConnection],
|
||||
Type[DatabaseConnection],
|
||||
Type[MessagingConnection],
|
||||
Type[MetadataConnection],
|
||||
]:
|
||||
"""
|
||||
Return the service type for a source string
|
||||
:param source_type: source string
|
||||
:return: service connection type
|
||||
"""
|
||||
if source_type in DatabaseServiceType.__members__:
|
||||
return DatabaseConnection
|
||||
if source_type in DashboardServiceType.__members__:
|
||||
return DashboardConnection
|
||||
if source_type in MessagingServiceType.__members__:
|
||||
return MessagingConnection
|
||||
if source_type in MetadataServiceType.__members__:
|
||||
return MetadataConnection
|
||||
|
||||
raise ValueError(f"Cannot find the service type of {source_type}")
|
||||
|
||||
|
||||
def get_connection_class(
|
||||
source_type: str,
|
||||
service_type: Union[
|
||||
Type[DashboardConnection],
|
||||
Type[DatabaseConnection],
|
||||
Type[MessagingConnection],
|
||||
Type[MetadataConnection],
|
||||
],
|
||||
) -> T:
|
||||
"""
|
||||
Build the connection class path, import and return it
|
||||
:param source_type: e.g., Glue
|
||||
:param service_type: e.g., DatabaseConnection
|
||||
:return: e.g., GlueConnection
|
||||
"""
|
||||
|
||||
# Get all the module path minus the file.
|
||||
# From metadata.generated.schema.entity.services.databaseService we get metadata.generated.schema.entity.services
|
||||
module_path = ".".join(service_type.__module__.split(".")[:-1])
|
||||
connection_path = service_type.__name__.lower().replace("connection", "")
|
||||
connection_module = source_type[0].lower() + source_type[1:] + "Connection"
|
||||
|
||||
class_name = source_type + "Connection"
|
||||
class_path = f"{module_path}.connections.{connection_path}.{connection_module}"
|
||||
|
||||
connection_class = getattr(
|
||||
__import__(class_path, globals(), locals(), [class_name]), class_name
|
||||
)
|
||||
|
||||
return connection_class
|
||||
|
||||
|
||||
def parse_workflow_config_gracefully(
|
||||
config_dict: dict,
|
||||
) -> Optional[OpenMetadataWorkflowConfig]:
|
||||
"""
|
||||
This function either correctly parses the pydantic class, or
|
||||
throws a scoped error while fetching the required source connection
|
||||
class.
|
||||
|
||||
:param config_dict: JSON workflow config
|
||||
:return:workflow config or scoped error
|
||||
"""
|
||||
|
||||
try:
|
||||
workflow_config = OpenMetadataWorkflowConfig.parse_obj(config_dict)
|
||||
|
||||
return workflow_config
|
||||
|
||||
except ValidationError:
|
||||
|
||||
# Unsafe access to the keys. Allow a KeyError if the config is not well formatted
|
||||
source_type = config_dict["source"]["serviceConnection"]["config"]["type"]
|
||||
logger.error(
|
||||
f"Error parsing the Workflow Configuration for {source_type} ingestion"
|
||||
)
|
||||
|
||||
service_type = get_service_type(source_type)
|
||||
connection_class = get_connection_class(source_type, service_type)
|
||||
|
||||
# Parse the dictionary with the scoped class
|
||||
connection_class.parse_obj(config_dict["source"]["serviceConnection"]["config"])
|
||||
@ -22,17 +22,24 @@ from metadata.generated.schema.metadataIngestion.workflow import (
|
||||
OpenMetadataWorkflowConfig,
|
||||
)
|
||||
from metadata.ingestion.api.bulk_sink import BulkSink
|
||||
from metadata.ingestion.api.parser import parse_workflow_config_gracefully
|
||||
from metadata.ingestion.api.processor import Processor
|
||||
from metadata.ingestion.api.sink import Sink
|
||||
from metadata.ingestion.api.source import Source
|
||||
from metadata.ingestion.api.stage import Stage
|
||||
from metadata.utils.logger import ingestion_logger
|
||||
from metadata.utils.logger import ingestion_logger, set_loggers_level
|
||||
|
||||
logger = ingestion_logger()
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
class InvalidWorkflowJSONException(Exception):
|
||||
"""
|
||||
Raised when we cannot properly parse the workflow
|
||||
"""
|
||||
|
||||
|
||||
class Workflow:
|
||||
config: OpenMetadataWorkflowConfig
|
||||
source: Source
|
||||
@ -44,6 +51,9 @@ class Workflow:
|
||||
|
||||
def __init__(self, config: OpenMetadataWorkflowConfig):
|
||||
self.config = config
|
||||
|
||||
set_loggers_level(config.workflowConfig.loggerLevel.value)
|
||||
|
||||
source_type = self.config.source.type.lower()
|
||||
source_class = self.get(
|
||||
"metadata.ingestion.source.{}.{}Source".format(
|
||||
@ -134,7 +144,7 @@ class Workflow:
|
||||
|
||||
@classmethod
|
||||
def create(cls, config_dict: dict) -> "Workflow":
|
||||
config = OpenMetadataWorkflowConfig.parse_obj(config_dict)
|
||||
config = parse_workflow_config_gracefully(config_dict)
|
||||
return cls(config)
|
||||
|
||||
def execute(self):
|
||||
|
||||
208
ingestion/tests/unit/test_workflow_parse.py
Normal file
208
ingestion/tests/unit/test_workflow_parse.py
Normal file
@ -0,0 +1,208 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Test Workflow pydantic parsing
|
||||
"""
|
||||
from unittest import TestCase
|
||||
|
||||
from pydantic import ValidationError
|
||||
|
||||
from metadata.generated.schema.entity.services.connections.dashboard.tableauConnection import (
|
||||
TableauConnection,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.connections.database.glueConnection import (
|
||||
GlueConnection,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.connections.messaging.pulsarConnection import (
|
||||
PulsarConnection,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
||||
OpenMetadataConnection,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.dashboardService import (
|
||||
DashboardConnection,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.databaseService import DatabaseConnection
|
||||
from metadata.generated.schema.entity.services.messagingService import (
|
||||
MessagingConnection,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.metadataService import MetadataConnection
|
||||
from metadata.ingestion.api.parser import (
|
||||
get_connection_class,
|
||||
get_service_type,
|
||||
parse_workflow_config_gracefully,
|
||||
)
|
||||
|
||||
|
||||
class TestWorkflowParse(TestCase):
|
||||
"""
|
||||
Test parsing scenarios of JSON Schemas
|
||||
"""
|
||||
|
||||
def test_get_service_type(self):
|
||||
"""
|
||||
Test that we can get the service type of a source
|
||||
"""
|
||||
|
||||
database_service = get_service_type("Mysql")
|
||||
self.assertEqual(database_service, DatabaseConnection)
|
||||
|
||||
dashboard_service = get_service_type("Looker")
|
||||
self.assertEqual(dashboard_service, DashboardConnection)
|
||||
|
||||
messaging_service = get_service_type("Kafka")
|
||||
self.assertEqual(messaging_service, MessagingConnection)
|
||||
|
||||
metadata_service = get_service_type("Amundsen")
|
||||
self.assertEqual(metadata_service, MetadataConnection)
|
||||
|
||||
with self.assertRaises(ValueError) as err:
|
||||
get_service_type("random")
|
||||
|
||||
self.assertEqual("Cannot find the service type of random", str(err.exception))
|
||||
|
||||
def test_get_connection_class(self):
|
||||
"""
|
||||
Check that we can correctly build the connection module ingredients
|
||||
"""
|
||||
source_type = "Glue"
|
||||
connection = get_connection_class(source_type, get_service_type(source_type))
|
||||
self.assertEqual(connection, GlueConnection)
|
||||
|
||||
source_type = "Tableau"
|
||||
connection = get_connection_class(source_type, get_service_type(source_type))
|
||||
self.assertEqual(connection, TableauConnection)
|
||||
|
||||
source_type = "OpenMetadata"
|
||||
connection = get_connection_class(source_type, get_service_type(source_type))
|
||||
self.assertEqual(connection, OpenMetadataConnection)
|
||||
|
||||
source_type = "Pulsar"
|
||||
connection = get_connection_class(source_type, get_service_type(source_type))
|
||||
self.assertEqual(connection, PulsarConnection)
|
||||
|
||||
def test_parsing_ok(self):
|
||||
"""
|
||||
Test MSSQL JSON Config parsing OK
|
||||
"""
|
||||
|
||||
config_dict = {
|
||||
"source": {
|
||||
"type": "mssql",
|
||||
"serviceName": "test_mssql",
|
||||
"serviceConnection": {
|
||||
"config": {
|
||||
"type": "Mssql",
|
||||
"database": "master",
|
||||
"username": "sa",
|
||||
"password": "MY%password",
|
||||
"hostPort": "random:1433",
|
||||
}
|
||||
},
|
||||
"sourceConfig": {
|
||||
"config": {
|
||||
"enableDataProfiler": True,
|
||||
"sampleDataQuery": "select top 50 * from [{}].[{}]",
|
||||
}
|
||||
},
|
||||
},
|
||||
"sink": {"type": "metadata-rest", "config": {}},
|
||||
"workflowConfig": {
|
||||
"loggerLevel": "WARN",
|
||||
"openMetadataServerConfig": {
|
||||
"hostPort": "http://localhost:8585/api",
|
||||
"authProvider": "no-auth",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
self.assertIsNotNone(parse_workflow_config_gracefully(config_dict))
|
||||
|
||||
def test_parsing_ko_mssql(self):
|
||||
"""
|
||||
Test MSSQL JSON Config parsing KO
|
||||
"""
|
||||
|
||||
config_dict = {
|
||||
"source": {
|
||||
"type": "mssql",
|
||||
"serviceName": "test_mssql",
|
||||
"serviceConnection": {
|
||||
"config": {
|
||||
"type": "Mssql",
|
||||
"database": "master",
|
||||
"username": "sa",
|
||||
"password": "MY%password",
|
||||
"hostPort": "localhost:1433",
|
||||
"random": "extra",
|
||||
}
|
||||
},
|
||||
"sourceConfig": {
|
||||
"config": {
|
||||
"enableDataProfiler": True,
|
||||
"sampleDataQuery": "select top 50 * from [{}].[{}]",
|
||||
}
|
||||
},
|
||||
},
|
||||
"sink": {"type": "metadata-rest", "config": {}},
|
||||
"workflowConfig": {
|
||||
"loggerLevel": "WARN",
|
||||
"openMetadataServerConfig": {
|
||||
"hostPort": "http://localhost:8585/api",
|
||||
"authProvider": "no-auth",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
with self.assertRaises(ValidationError) as err:
|
||||
parse_workflow_config_gracefully(config_dict)
|
||||
|
||||
self.assertIn("1 validation error for MssqlConnection", str(err.exception))
|
||||
|
||||
def test_parsing_ko_glue(self):
|
||||
"""
|
||||
Test Glue JSON Config parsing OK
|
||||
"""
|
||||
|
||||
config_dict = {
|
||||
"source": {
|
||||
"type": "glue",
|
||||
"serviceName": "local_glue",
|
||||
"serviceConnection": {
|
||||
"config": {
|
||||
"type": "Glue",
|
||||
"awsConfig": {
|
||||
"awsSecretAccessKey": "aws secret access key",
|
||||
"awsRegion": "aws region",
|
||||
"endPointURL": "https://glue.<region_name>.amazonaws.com/",
|
||||
},
|
||||
"database": "local_glue_db",
|
||||
"storageServiceName": "storage_name",
|
||||
"pipelineServiceName": "pipeline_name",
|
||||
"random": "extra",
|
||||
}
|
||||
},
|
||||
"sourceConfig": {"config": {"enableDataProfiler": False}},
|
||||
},
|
||||
"sink": {"type": "metadata-rest", "config": {}},
|
||||
"workflowConfig": {
|
||||
"openMetadataServerConfig": {
|
||||
"hostPort": "http://localhost:8585/api",
|
||||
"authProvider": "no-auth",
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
with self.assertRaises(ValidationError) as err:
|
||||
parse_workflow_config_gracefully(config_dict)
|
||||
|
||||
self.assertIn("2 validation errors for GlueConnection", str(err.exception))
|
||||
Loading…
x
Reference in New Issue
Block a user