2022-04-29 18:20:10 +02:00
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test Workflow pydantic parsing
"""
from unittest import TestCase
from pydantic import ValidationError
2023-03-06 14:44:16 +01:00
from metadata . generated . schema . entity . automations . testServiceConnection import (
2022-07-18 18:50:27 +02:00
TestServiceConnectionRequest ,
)
2022-04-29 18:20:10 +02:00
from metadata . generated . schema . entity . services . connections . dashboard . tableauConnection import (
TableauConnection ,
)
from metadata . generated . schema . entity . services . connections . database . glueConnection import (
GlueConnection ,
)
2023-04-23 18:43:46 +02:00
from metadata . generated . schema . entity . services . connections . messaging . kafkaConnection import (
KafkaConnection ,
2022-04-29 18:20:10 +02:00
)
from metadata . generated . schema . entity . services . connections . metadata . openMetadataConnection import (
OpenMetadataConnection ,
)
from metadata . generated . schema . entity . services . dashboardService import (
DashboardConnection ,
)
from metadata . generated . schema . entity . services . databaseService import DatabaseConnection
from metadata . generated . schema . entity . services . messagingService import (
MessagingConnection ,
)
from metadata . generated . schema . entity . services . metadataService import MetadataConnection
2022-07-27 23:51:16 +05:30
from metadata . generated . schema . metadataIngestion . dashboardServiceMetadataPipeline import (
DashboardServiceMetadataPipeline ,
)
from metadata . generated . schema . metadataIngestion . databaseServiceMetadataPipeline import (
DatabaseServiceMetadataPipeline ,
)
from metadata . generated . schema . metadataIngestion . databaseServiceProfilerPipeline import (
DatabaseServiceProfilerPipeline ,
)
from metadata . generated . schema . metadataIngestion . pipelineServiceMetadataPipeline import (
PipelineServiceMetadataPipeline ,
)
2022-04-29 18:20:10 +02:00
from metadata . ingestion . api . parser import (
2022-09-19 17:00:00 +02:00
ParsingConfigurationError ,
2022-04-29 18:20:10 +02:00
get_connection_class ,
get_service_type ,
2022-07-27 23:51:16 +05:30
get_source_config_class ,
2022-07-18 18:50:27 +02:00
parse_test_connection_request_gracefully ,
2022-04-29 18:20:10 +02:00
parse_workflow_config_gracefully ,
)
class TestWorkflowParse ( TestCase ) :
"""
Test parsing scenarios of JSON Schemas
"""
def test_get_service_type ( self ) :
"""
2022-08-16 18:47:50 +02:00
Test that we can get the service type of source
2022-04-29 18:20:10 +02:00
"""
database_service = get_service_type ( " Mysql " )
self . assertEqual ( database_service , DatabaseConnection )
dashboard_service = get_service_type ( " Looker " )
self . assertEqual ( dashboard_service , DashboardConnection )
messaging_service = get_service_type ( " Kafka " )
self . assertEqual ( messaging_service , MessagingConnection )
metadata_service = get_service_type ( " Amundsen " )
self . assertEqual ( metadata_service , MetadataConnection )
with self . assertRaises ( ValueError ) as err :
get_service_type ( " random " )
self . assertEqual ( " Cannot find the service type of random " , str ( err . exception ) )
def test_get_connection_class ( self ) :
"""
Check that we can correctly build the connection module ingredients
"""
source_type = " Glue "
connection = get_connection_class ( source_type , get_service_type ( source_type ) )
self . assertEqual ( connection , GlueConnection )
source_type = " Tableau "
connection = get_connection_class ( source_type , get_service_type ( source_type ) )
self . assertEqual ( connection , TableauConnection )
source_type = " OpenMetadata "
connection = get_connection_class ( source_type , get_service_type ( source_type ) )
self . assertEqual ( connection , OpenMetadataConnection )
2023-04-23 18:43:46 +02:00
source_type = " Kafka "
2022-04-29 18:20:10 +02:00
connection = get_connection_class ( source_type , get_service_type ( source_type ) )
2023-04-23 18:43:46 +02:00
self . assertEqual ( connection , KafkaConnection )
2022-04-29 18:20:10 +02:00
2022-07-27 23:51:16 +05:30
def test_get_source_config_class ( self ) :
"""
Check that we can correctly build the connection module ingredients
"""
source_config_type = " Profiler "
connection = get_source_config_class ( source_config_type )
self . assertEqual ( connection , DatabaseServiceProfilerPipeline )
source_config_type = " DatabaseMetadata "
connection = get_source_config_class ( source_config_type )
self . assertEqual ( connection , DatabaseServiceMetadataPipeline )
source_config_type = " PipelineMetadata "
connection = get_source_config_class ( source_config_type )
self . assertEqual ( connection , PipelineServiceMetadataPipeline )
source_config_type = " DashboardMetadata "
connection = get_source_config_class ( source_config_type )
self . assertEqual ( connection , DashboardServiceMetadataPipeline )
2022-04-29 18:20:10 +02:00
def test_parsing_ok ( self ) :
"""
Test MSSQL JSON Config parsing OK
"""
config_dict = {
" source " : {
" type " : " mssql " ,
" serviceName " : " test_mssql " ,
" serviceConnection " : {
" config " : {
" type " : " Mssql " ,
" database " : " master " ,
" username " : " sa " ,
" password " : " MY % password " ,
" hostPort " : " random:1433 " ,
}
} ,
2022-06-08 16:10:40 +02:00
" sourceConfig " : { " config " : { " type " : " DatabaseMetadata " } } ,
2022-04-29 18:20:10 +02:00
} ,
" sink " : { " type " : " metadata-rest " , " config " : { } } ,
" workflowConfig " : {
" loggerLevel " : " WARN " ,
" openMetadataServerConfig " : {
" hostPort " : " http://localhost:8585/api " ,
" authProvider " : " no-auth " ,
} ,
} ,
}
self . assertIsNotNone ( parse_workflow_config_gracefully ( config_dict ) )
def test_parsing_ko_mssql ( self ) :
"""
Test MSSQL JSON Config parsing KO
"""
config_dict = {
" source " : {
" type " : " mssql " ,
" serviceName " : " test_mssql " ,
" serviceConnection " : {
" config " : {
" type " : " Mssql " ,
" database " : " master " ,
" username " : " sa " ,
" password " : " MY % password " ,
" hostPort " : " localhost:1433 " ,
" random " : " extra " ,
}
} ,
2022-06-08 16:10:40 +02:00
" sourceConfig " : { " config " : { " type " : " DatabaseMetadata " } } ,
2022-04-29 18:20:10 +02:00
} ,
" sink " : { " type " : " metadata-rest " , " config " : { } } ,
" workflowConfig " : {
" loggerLevel " : " WARN " ,
" openMetadataServerConfig " : {
" hostPort " : " http://localhost:8585/api " ,
" authProvider " : " no-auth " ,
} ,
} ,
}
2022-09-19 17:00:00 +02:00
with self . assertRaises ( ParsingConfigurationError ) as err :
2022-04-29 18:20:10 +02:00
parse_workflow_config_gracefully ( config_dict )
2022-09-19 17:00:00 +02:00
self . assertIn (
" We encountered an error parsing the configuration of your MssqlConnection. \n You might need to review your config based on the original cause of this failure: \n \t - Extra parameter ' random ' " ,
str ( err . exception ) ,
)
2022-04-29 18:20:10 +02:00
2022-07-27 23:51:16 +05:30
def test_parsing_ko_mssql_source_config ( self ) :
"""
Test MSSQL JSON Config parsing KO
"""
config_dict = {
" source " : {
" type " : " mssql " ,
" serviceName " : " test_mssql " ,
" serviceConnection " : {
" config " : {
" type " : " Mssql " ,
" database " : " master " ,
" username " : " sa " ,
" password " : " MY % password " ,
" hostPort " : " localhost:1433 " ,
}
} ,
" sourceConfig " : {
" config " : { " type " : " DatabaseMetadata " , " random " : " extra " }
} ,
} ,
" sink " : { " type " : " metadata-rest " , " config " : { } } ,
" workflowConfig " : {
" loggerLevel " : " WARN " ,
" openMetadataServerConfig " : {
" hostPort " : " http://localhost:8585/api " ,
" authProvider " : " no-auth " ,
} ,
} ,
}
2022-09-19 17:00:00 +02:00
with self . assertRaises ( ParsingConfigurationError ) as err :
2022-07-27 23:51:16 +05:30
parse_workflow_config_gracefully ( config_dict )
self . assertIn (
2022-09-19 17:00:00 +02:00
" We encountered an error parsing the configuration of your DatabaseServiceMetadataPipeline. \n You might need to review your config based on the original cause of this failure: \n \t - Extra parameter ' random ' " ,
2022-07-27 23:51:16 +05:30
str ( err . exception ) ,
)
2022-04-29 18:20:10 +02:00
def test_parsing_ko_glue ( self ) :
"""
Test Glue JSON Config parsing OK
"""
config_dict = {
" source " : {
" type " : " glue " ,
" serviceName " : " local_glue " ,
" serviceConnection " : {
" config " : {
" type " : " Glue " ,
" awsConfig " : {
" awsSecretAccessKey " : " aws secret access key " ,
" awsRegion " : " aws region " ,
" endPointURL " : " https://glue.<region_name>.amazonaws.com/ " ,
} ,
" random " : " extra " ,
}
} ,
2022-06-08 16:10:40 +02:00
" sourceConfig " : { " config " : { " type " : " DatabaseMetadata " } } ,
2022-04-29 18:20:10 +02:00
} ,
" sink " : { " type " : " metadata-rest " , " config " : { } } ,
" workflowConfig " : {
" openMetadataServerConfig " : {
" hostPort " : " http://localhost:8585/api " ,
" authProvider " : " no-auth " ,
}
} ,
}
2022-09-19 17:00:00 +02:00
with self . assertRaises ( ParsingConfigurationError ) as err :
2022-04-29 18:20:10 +02:00
parse_workflow_config_gracefully ( config_dict )
2022-05-31 12:15:02 +05:30
self . assertIn (
2022-09-19 17:00:00 +02:00
" We encountered an error parsing the configuration of your GlueConnection. \n You might need to review your config based on the original cause of this failure: \n \t - Extra parameter ' random ' " ,
2022-07-27 23:51:16 +05:30
str ( err . exception ) ,
)
def test_parsing_ko_airbyte ( self ) :
"""
Test Glue JSON Config parsing OK
"""
config_dict = {
" source " : {
" type " : " airbyte " ,
" serviceName " : " local_airbyte " ,
" serviceConnection " : {
" config " : { " type " : " Airbyte " , " hostPort " : " http://localhost:8000 " }
} ,
" sourceConfig " : {
" config " : { " type " : " PipelineMetadata " , " random " : " extra " }
} ,
} ,
" sink " : { " type " : " metadata-rest " , " config " : { } } ,
" workflowConfig " : {
" openMetadataServerConfig " : {
" hostPort " : " http://localhost:8585/api " ,
" authProvider " : " no-auth " ,
}
} ,
}
2022-09-19 17:00:00 +02:00
with self . assertRaises ( ParsingConfigurationError ) as err :
2022-07-27 23:51:16 +05:30
parse_workflow_config_gracefully ( config_dict )
self . assertIn (
2022-09-19 17:00:00 +02:00
" We encountered an error parsing the configuration of your PipelineServiceMetadataPipeline. \n You might need to review your config based on the original cause of this failure: \n \t - Extra parameter ' random ' " ,
2022-05-31 12:15:02 +05:30
str ( err . exception ) ,
)
2022-07-18 18:50:27 +02:00
def test_test_connection_mysql ( self ) :
"""
Test the TestConnection for MySQL
"""
config_dict = {
" connection " : {
" config " : {
" type " : " Mysql " ,
" username " : " openmetadata_user " ,
" password " : " openmetadata_password " ,
" hostPort " : " localhost:3306 " ,
}
} ,
" connectionType " : " Database " ,
}
self . assertIsInstance (
parse_test_connection_request_gracefully ( config_dict ) ,
TestServiceConnectionRequest ,
)
config_dict_ko = {
" connection " : {
" config " : {
" type " : " Mysql " ,
" username " : " openmetadata_user " ,
" password " : " openmetadata_password " ,
}
} ,
" connectionType " : " Database " ,
}
with self . assertRaises ( ValidationError ) as err :
parse_test_connection_request_gracefully ( config_dict_ko )
self . assertIn (
" 1 validation error for MysqlConnection \n hostPort \n field required (type=value_error.missing) " ,
str ( err . exception ) ,
)
config_dict_ko2 = {
" connection " : {
" config " : {
" type " : " Mysql " ,
" username " : " openmetadata_user " ,
" password " : " openmetadata_password " ,
" hostPort " : " localhost:3306 " ,
" random " : " value " ,
}
} ,
" connectionType " : " Database " ,
}
with self . assertRaises ( ValidationError ) as err :
parse_test_connection_request_gracefully ( config_dict_ko2 )
self . assertIn (
" 1 validation error for MysqlConnection \n random \n extra fields not permitted (type=value_error.extra) " ,
str ( err . exception ) ,
)