2025-04-03 10:39:47 +05:30
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
2022-04-29 18:20:10 +02:00
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
2025-04-03 10:39:47 +05:30
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
2022-04-29 18:20:10 +02:00
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test Workflow pydantic parsing
"""
from unittest import TestCase
from pydantic import ValidationError
2023-12-22 09:56:39 +05:30
from metadata . generated . schema . entity . automations . workflow import (
Workflow as AutomationWorkflow ,
2022-07-18 18:50:27 +02:00
)
2024-10-08 14:39:55 +05:30
from metadata . generated . schema . entity . services . connections . api . restConnection import (
RestConnection ,
)
2022-04-29 18:20:10 +02:00
from metadata . generated . schema . entity . services . connections . dashboard . tableauConnection import (
TableauConnection ,
)
from metadata . generated . schema . entity . services . connections . database . glueConnection import (
GlueConnection ,
)
2023-04-23 18:43:46 +02:00
from metadata . generated . schema . entity . services . connections . messaging . kafkaConnection import (
KafkaConnection ,
2022-04-29 18:20:10 +02:00
)
from metadata . generated . schema . entity . services . connections . metadata . openMetadataConnection import (
OpenMetadataConnection ,
)
from metadata . generated . schema . entity . services . dashboardService import (
DashboardConnection ,
)
from metadata . generated . schema . entity . services . databaseService import DatabaseConnection
2023-12-22 09:56:39 +05:30
from metadata . generated . schema . entity . services . ingestionPipelines . ingestionPipeline import (
IngestionPipeline ,
)
2022-04-29 18:20:10 +02:00
from metadata . generated . schema . entity . services . messagingService import (
MessagingConnection ,
)
from metadata . generated . schema . entity . services . metadataService import MetadataConnection
2024-09-11 13:36:53 +05:30
from metadata . generated . schema . metadataIngestion . apiServiceMetadataPipeline import (
ApiServiceMetadataPipeline ,
)
2022-07-27 23:51:16 +05:30
from metadata . generated . schema . metadataIngestion . dashboardServiceMetadataPipeline import (
DashboardServiceMetadataPipeline ,
)
from metadata . generated . schema . metadataIngestion . databaseServiceMetadataPipeline import (
DatabaseServiceMetadataPipeline ,
)
from metadata . generated . schema . metadataIngestion . databaseServiceProfilerPipeline import (
DatabaseServiceProfilerPipeline ,
)
from metadata . generated . schema . metadataIngestion . pipelineServiceMetadataPipeline import (
PipelineServiceMetadataPipeline ,
)
2022-04-29 18:20:10 +02:00
from metadata . ingestion . api . parser import (
2022-09-19 17:00:00 +02:00
ParsingConfigurationError ,
2022-04-29 18:20:10 +02:00
get_connection_class ,
get_service_type ,
2022-07-27 23:51:16 +05:30
get_source_config_class ,
2023-12-22 09:56:39 +05:30
parse_automation_workflow_gracefully ,
parse_ingestion_pipeline_config_gracefully ,
2022-04-29 18:20:10 +02:00
parse_workflow_config_gracefully ,
)
class TestWorkflowParse ( TestCase ) :
"""
Test parsing scenarios of JSON Schemas
"""
def test_get_service_type ( self ) :
"""
2022-08-16 18:47:50 +02:00
Test that we can get the service type of source
2022-04-29 18:20:10 +02:00
"""
database_service = get_service_type ( " Mysql " )
self . assertEqual ( database_service , DatabaseConnection )
dashboard_service = get_service_type ( " Looker " )
self . assertEqual ( dashboard_service , DashboardConnection )
messaging_service = get_service_type ( " Kafka " )
self . assertEqual ( messaging_service , MessagingConnection )
metadata_service = get_service_type ( " Amundsen " )
self . assertEqual ( metadata_service , MetadataConnection )
with self . assertRaises ( ValueError ) as err :
get_service_type ( " random " )
self . assertEqual ( " Cannot find the service type of random " , str ( err . exception ) )
def test_get_connection_class ( self ) :
"""
Check that we can correctly build the connection module ingredients
"""
source_type = " Glue "
connection = get_connection_class ( source_type , get_service_type ( source_type ) )
self . assertEqual ( connection , GlueConnection )
source_type = " Tableau "
connection = get_connection_class ( source_type , get_service_type ( source_type ) )
self . assertEqual ( connection , TableauConnection )
source_type = " OpenMetadata "
connection = get_connection_class ( source_type , get_service_type ( source_type ) )
self . assertEqual ( connection , OpenMetadataConnection )
2023-04-23 18:43:46 +02:00
source_type = " Kafka "
2022-04-29 18:20:10 +02:00
connection = get_connection_class ( source_type , get_service_type ( source_type ) )
2023-04-23 18:43:46 +02:00
self . assertEqual ( connection , KafkaConnection )
2022-04-29 18:20:10 +02:00
2024-10-08 14:39:55 +05:30
source_type = " Rest "
connection = get_connection_class ( source_type , get_service_type ( source_type ) )
self . assertEqual ( connection , RestConnection )
2022-07-27 23:51:16 +05:30
def test_get_source_config_class ( self ) :
"""
Check that we can correctly build the connection module ingredients
"""
source_config_type = " Profiler "
connection = get_source_config_class ( source_config_type )
self . assertEqual ( connection , DatabaseServiceProfilerPipeline )
source_config_type = " DatabaseMetadata "
connection = get_source_config_class ( source_config_type )
self . assertEqual ( connection , DatabaseServiceMetadataPipeline )
source_config_type = " PipelineMetadata "
connection = get_source_config_class ( source_config_type )
self . assertEqual ( connection , PipelineServiceMetadataPipeline )
source_config_type = " DashboardMetadata "
connection = get_source_config_class ( source_config_type )
self . assertEqual ( connection , DashboardServiceMetadataPipeline )
2024-09-11 13:36:53 +05:30
source_config_type = " ApiMetadata "
connection = get_source_config_class ( source_config_type )
self . assertEqual ( connection , ApiServiceMetadataPipeline )
2022-04-29 18:20:10 +02:00
def test_parsing_ok ( self ) :
"""
Test MSSQL JSON Config parsing OK
"""
config_dict = {
" source " : {
" type " : " mssql " ,
" serviceName " : " test_mssql " ,
" serviceConnection " : {
" config " : {
" type " : " Mssql " ,
" database " : " master " ,
" username " : " sa " ,
" password " : " MY % password " ,
" hostPort " : " random:1433 " ,
}
} ,
2022-06-08 16:10:40 +02:00
" sourceConfig " : { " config " : { " type " : " DatabaseMetadata " } } ,
2022-04-29 18:20:10 +02:00
} ,
" sink " : { " type " : " metadata-rest " , " config " : { } } ,
" workflowConfig " : {
" loggerLevel " : " WARN " ,
" openMetadataServerConfig " : {
" hostPort " : " http://localhost:8585/api " ,
2023-12-19 12:08:48 +01:00
" authProvider " : " openmetadata " ,
" securityConfig " : { " jwtToken " : " token " } ,
2022-04-29 18:20:10 +02:00
} ,
} ,
}
self . assertIsNotNone ( parse_workflow_config_gracefully ( config_dict ) )
def test_parsing_ko_mssql ( self ) :
"""
Test MSSQL JSON Config parsing KO
"""
config_dict = {
" source " : {
" type " : " mssql " ,
" serviceName " : " test_mssql " ,
" serviceConnection " : {
" config " : {
" type " : " Mssql " ,
" database " : " master " ,
" username " : " sa " ,
" password " : " MY % password " ,
" hostPort " : " localhost:1433 " ,
" random " : " extra " ,
}
} ,
2022-06-08 16:10:40 +02:00
" sourceConfig " : { " config " : { " type " : " DatabaseMetadata " } } ,
2022-04-29 18:20:10 +02:00
} ,
" sink " : { " type " : " metadata-rest " , " config " : { } } ,
" workflowConfig " : {
" loggerLevel " : " WARN " ,
" openMetadataServerConfig " : {
" hostPort " : " http://localhost:8585/api " ,
" authProvider " : " no-auth " ,
} ,
} ,
}
2022-09-19 17:00:00 +02:00
with self . assertRaises ( ParsingConfigurationError ) as err :
2022-04-29 18:20:10 +02:00
parse_workflow_config_gracefully ( config_dict )
2022-09-19 17:00:00 +02:00
self . assertIn (
" We encountered an error parsing the configuration of your MssqlConnection. \n You might need to review your config based on the original cause of this failure: \n \t - Extra parameter ' random ' " ,
str ( err . exception ) ,
)
2022-04-29 18:20:10 +02:00
2022-07-27 23:51:16 +05:30
def test_parsing_ko_mssql_source_config ( self ) :
"""
Test MSSQL JSON Config parsing KO
"""
config_dict = {
" source " : {
" type " : " mssql " ,
" serviceName " : " test_mssql " ,
" serviceConnection " : {
" config " : {
" type " : " Mssql " ,
" database " : " master " ,
" username " : " sa " ,
" password " : " MY % password " ,
" hostPort " : " localhost:1433 " ,
}
} ,
" sourceConfig " : {
" config " : { " type " : " DatabaseMetadata " , " random " : " extra " }
} ,
} ,
" sink " : { " type " : " metadata-rest " , " config " : { } } ,
" workflowConfig " : {
" loggerLevel " : " WARN " ,
" openMetadataServerConfig " : {
" hostPort " : " http://localhost:8585/api " ,
" authProvider " : " no-auth " ,
} ,
} ,
}
2022-09-19 17:00:00 +02:00
with self . assertRaises ( ParsingConfigurationError ) as err :
2022-07-27 23:51:16 +05:30
parse_workflow_config_gracefully ( config_dict )
self . assertIn (
2022-09-19 17:00:00 +02:00
" We encountered an error parsing the configuration of your DatabaseServiceMetadataPipeline. \n You might need to review your config based on the original cause of this failure: \n \t - Extra parameter ' random ' " ,
2022-07-27 23:51:16 +05:30
str ( err . exception ) ,
)
2022-04-29 18:20:10 +02:00
def test_parsing_ko_glue ( self ) :
"""
Test Glue JSON Config parsing OK
"""
config_dict = {
" source " : {
" type " : " glue " ,
" serviceName " : " local_glue " ,
" serviceConnection " : {
" config " : {
" type " : " Glue " ,
" awsConfig " : {
" awsSecretAccessKey " : " aws secret access key " ,
" awsRegion " : " aws region " ,
" endPointURL " : " https://glue.<region_name>.amazonaws.com/ " ,
} ,
" random " : " extra " ,
}
} ,
2022-06-08 16:10:40 +02:00
" sourceConfig " : { " config " : { " type " : " DatabaseMetadata " } } ,
2022-04-29 18:20:10 +02:00
} ,
" sink " : { " type " : " metadata-rest " , " config " : { } } ,
" workflowConfig " : {
" openMetadataServerConfig " : {
" hostPort " : " http://localhost:8585/api " ,
" authProvider " : " no-auth " ,
}
} ,
}
2022-09-19 17:00:00 +02:00
with self . assertRaises ( ParsingConfigurationError ) as err :
2022-04-29 18:20:10 +02:00
parse_workflow_config_gracefully ( config_dict )
2022-05-31 12:15:02 +05:30
self . assertIn (
2022-09-19 17:00:00 +02:00
" We encountered an error parsing the configuration of your GlueConnection. \n You might need to review your config based on the original cause of this failure: \n \t - Extra parameter ' random ' " ,
2022-07-27 23:51:16 +05:30
str ( err . exception ) ,
)
def test_parsing_ko_airbyte ( self ) :
"""
Test Glue JSON Config parsing OK
"""
config_dict = {
" source " : {
" type " : " airbyte " ,
" serviceName " : " local_airbyte " ,
" serviceConnection " : {
" config " : { " type " : " Airbyte " , " hostPort " : " http://localhost:8000 " }
} ,
" sourceConfig " : {
" config " : { " type " : " PipelineMetadata " , " random " : " extra " }
} ,
} ,
" sink " : { " type " : " metadata-rest " , " config " : { } } ,
" workflowConfig " : {
" openMetadataServerConfig " : {
" hostPort " : " http://localhost:8585/api " ,
" authProvider " : " no-auth " ,
}
} ,
}
2022-09-19 17:00:00 +02:00
with self . assertRaises ( ParsingConfigurationError ) as err :
2022-07-27 23:51:16 +05:30
parse_workflow_config_gracefully ( config_dict )
self . assertIn (
2022-09-19 17:00:00 +02:00
" We encountered an error parsing the configuration of your PipelineServiceMetadataPipeline. \n You might need to review your config based on the original cause of this failure: \n \t - Extra parameter ' random ' " ,
2022-05-31 12:15:02 +05:30
str ( err . exception ) ,
)
2022-07-18 18:50:27 +02:00
2024-09-16 12:57:12 +05:30
def test_parsing_matillion_pipeline ( self ) :
"""
Test Matillion JSON Config parsing OK
"""
config_dict = {
" source " : {
" type " : " Matillion " ,
" serviceName " : " local_Matillion_123 " ,
" serviceConnection " : {
" config " : {
" type " : " Matillion " ,
" connection " : {
" type " : " MatillionETL " ,
" hostPort " : " hostport " ,
" username " : " username " ,
" password " : " password " ,
" sslConfig " : {
" caCertificate " : " -----BEGIN CERTIFICATE----- \n sample certificate \n -----END CERTIFICATE----- \n "
} ,
} ,
}
} ,
" sourceConfig " : {
" config " : { " type " : " PipelineMetadata " , " includeLineage " : True }
} ,
} ,
" sink " : { " type " : " metadata-rest " , " config " : { } } ,
" workflowConfig " : {
" loggerLevel " : " DEBUG " ,
" openMetadataServerConfig " : {
" hostPort " : " http://localhost:8585/api " ,
" authProvider " : " openmetadata " ,
" securityConfig " : {
" jwtToken " : " eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg "
} ,
} ,
} ,
}
self . assertTrue ( parse_workflow_config_gracefully ( config_dict ) )
del config_dict [ " source " ] [ " serviceConnection " ] [ " config " ] [ " connection " ] [
" sslConfig "
]
self . assertTrue ( parse_workflow_config_gracefully ( config_dict ) )
del config_dict [ " source " ] [ " serviceConnection " ] [ " config " ] [ " connection " ] [
" username "
]
del config_dict [ " source " ] [ " serviceConnection " ] [ " config " ] [ " connection " ] [
" hostPort "
]
del config_dict [ " source " ] [ " serviceConnection " ] [ " config " ] [ " connection " ] [
" password "
]
with self . assertRaises ( ParsingConfigurationError ) as err :
parse_workflow_config_gracefully ( config_dict )
self . assertIn (
" We encountered an error parsing the configuration of your MatillionConnection. \n You might need to review your config based on the original cause of this failure: \n \t - Missing parameter in ( ' connection ' , ' hostPort ' ) \n \t - Missing parameter in ( ' connection ' , ' username ' ) \n \t - Missing parameter in ( ' connection ' , ' password ' ) " ,
str ( err . exception ) ,
)
2023-12-22 09:56:39 +05:30
def test_parsing_ingestion_pipeline_mysql ( self ) :
2022-07-18 18:50:27 +02:00
"""
2023-12-22 09:56:39 +05:30
Test parsing of ingestion_pipeline for MYSQL
2022-07-18 18:50:27 +02:00
"""
config_dict = {
2023-12-22 09:56:39 +05:30
" id " : " 08868b3e-cd02-4257-a545-9080856371a0 " ,
" name " : " qwfef_metadata_SPWHTqVO " ,
" pipelineType " : " metadata " ,
" sourceConfig " : {
2022-07-18 18:50:27 +02:00
" config " : {
2023-12-22 09:56:39 +05:30
" type " : " DatabaseMetadata " ,
" includeTags " : True ,
" includeViews " : True ,
" includeTables " : True ,
" queryLogDuration " : 1 ,
" markDeletedTables " : True ,
" tableFilterPattern " : { " excludes " : [ ] , " includes " : [ ] } ,
" useFqnForFiltering " : True ,
" schemaFilterPattern " : { " excludes " : [ ] , " includes " : [ ] } ,
" databaseFilterPattern " : { " excludes " : [ ] , " includes " : [ ] } ,
" includeStoredProcedures " : True ,
" queryParsingTimeoutLimit " : 300 ,
" markDeletedStoredProcedures " : True ,
2022-07-18 18:50:27 +02:00
}
} ,
2023-12-22 09:56:39 +05:30
" airflowConfig " : {
" retries " : 0 ,
" startDate " : " 2023-12-19T00:00:00.000000Z " ,
" retryDelay " : 300 ,
" concurrency " : 1 ,
" maxActiveRuns " : 1 ,
" pausePipeline " : False ,
" pipelineCatchup " : False ,
" pipelineTimezone " : " UTC " ,
" scheduleInterval " : " 0 * * * * " ,
" workflowDefaultView " : " tree " ,
" workflowDefaultViewOrientation " : " LR " ,
} ,
2022-07-18 18:50:27 +02:00
}
self . assertIsInstance (
2023-12-22 09:56:39 +05:30
parse_ingestion_pipeline_config_gracefully ( config_dict ) ,
IngestionPipeline ,
2022-07-18 18:50:27 +02:00
)
config_dict_ko = {
2023-12-22 09:56:39 +05:30
" id " : " 08868b3e-cd02-4257-a545-9080856371a0 " ,
" name " : " qwfef_metadata_SPWHTqVO " ,
" pipelineType " : " metadata " ,
" sourceConfig " : {
2022-07-18 18:50:27 +02:00
" config " : {
2023-12-22 09:56:39 +05:30
" type " : " DatabaseMetadata " ,
" includeTags " : True ,
" includeViews " : True ,
" includeTables " : True ,
" viewLogDuration " : 1 ,
" markDeletedTables " : True ,
" tFilterPattern " : { " excludes " : [ ] , " includes " : [ ] } ,
" useFqnForFiltering " : True ,
" schemaFilterPattern " : { " excludes " : [ ] , " includes " : [ ] } ,
" databaseFilterPattern " : { " excludes " : [ ] , " includes " : [ ] } ,
" includeStoredProcedures " : True ,
" queryParsingTimeoutLimit " : 300 ,
" markDeletedStoredProcedures " : True ,
2022-07-18 18:50:27 +02:00
}
} ,
2023-12-22 09:56:39 +05:30
" airflowConfig " : {
" retries " : 0 ,
" startDate " : " 2023-12-19T00:00:00.000000Z " ,
" retryDelay " : 300 ,
" concurrency " : 1 ,
" maxActiveRuns " : 1 ,
" pausePipeline " : False ,
" pipelineCatchup " : False ,
" pipelineTimezone " : " UTC " ,
" scheduleInterval " : " 0 * * * * " ,
" workflowDefaultView " : " tree " ,
" workflowDefaultViewOrientation " : " LR " ,
} ,
2022-07-18 18:50:27 +02:00
}
with self . assertRaises ( ValidationError ) as err :
2023-12-22 09:56:39 +05:30
parse_ingestion_pipeline_config_gracefully ( config_dict_ko )
2022-07-18 18:50:27 +02:00
self . assertIn (
2024-06-05 21:18:37 +02:00
" 2 validation errors for DatabaseServiceMetadataPipeline \n viewLogDuration \n Extra inputs are not permitted " ,
2022-07-18 18:50:27 +02:00
str ( err . exception ) ,
)
2023-12-22 09:56:39 +05:30
def test_parsing_ingestion_pipeline_dagster ( self ) :
"""
Test parsing of ingestion_pipeline for Dagster
"""
config_dict = {
" id " : " da50179a-02c8-42d1-a8bd-3002a49649a6 " ,
" name " : " dagster_dev_metadata_G6pRkj7X " ,
" pipelineType " : " metadata " ,
" sourceConfig " : {
" config " : {
" type " : " PipelineMetadata " ,
" includeTags " : True ,
" includeOwners " : True ,
2024-03-15 12:42:47 +05:30
" lineageInformation " : { " dbServiceNames " : [ " dev " ] } ,
2023-12-22 09:56:39 +05:30
" includeLineage " : True ,
" markDeletedPipelines " : True ,
" pipelineFilterPattern " : {
" excludes " : [ ] ,
" includes " : [ " test_pipeline " ] ,
} ,
}
} ,
" airflowConfig " : {
" retries " : 0 ,
" startDate " : " 2023-12-19T00:00:00.000000Z " ,
" retryDelay " : 300 ,
" concurrency " : 1 ,
" maxActiveRuns " : 1 ,
" pausePipeline " : False ,
" pipelineCatchup " : False ,
" pipelineTimezone " : " UTC " ,
" scheduleInterval " : " 0 * * * * " ,
" workflowDefaultView " : " tree " ,
" workflowDefaultViewOrientation " : " LR " ,
} ,
}
self . assertIsInstance (
parse_ingestion_pipeline_config_gracefully ( config_dict ) ,
IngestionPipeline ,
)
config_dict_ko = {
" id " : " da50179a-02c8-42d1-a8bd-3002a49649a6 " ,
" name " : " dagster_dev_metadata_G6pRkj7X " ,
" pipelineType " : " metadata " ,
" sourceConfig " : {
2022-07-18 18:50:27 +02:00
" config " : {
2023-12-22 09:56:39 +05:30
" type " : " PipelineMetadata " ,
" includeTags " : True ,
" includeOwners " : True ,
2024-03-15 12:42:47 +05:30
" lineageInformation " : { " dbServiceNames " : [ " dev " ] } ,
2023-12-22 09:56:39 +05:30
" includeViewLineage " : True ,
" markDeletedDbs " : True ,
" pipelineFilterPatterns " : {
" excludes " : [ ] ,
" includes " : [ " test_pipeline " ] ,
} ,
2022-07-18 18:50:27 +02:00
}
} ,
2023-12-22 09:56:39 +05:30
" airflowConfig " : {
" retries " : 0 ,
" startDate " : " 2023-12-19T00:00:00.000000Z " ,
" retryDelay " : 300 ,
" concurrency " : 1 ,
" maxActiveRuns " : 1 ,
" pausePipeline " : False ,
" pipelineCatchup " : False ,
" pipelineTimezone " : " UTC " ,
" scheduleInterval " : " 0 * * * * " ,
" workflowDefaultView " : " tree " ,
" workflowDefaultViewOrientation " : " LR " ,
} ,
}
with self . assertRaises ( ValidationError ) as err :
parse_ingestion_pipeline_config_gracefully ( config_dict_ko )
self . assertIn (
2024-06-05 21:18:37 +02:00
" 3 validation errors for PipelineServiceMetadataPipeline \n includeViewLineage \n Extra inputs are not permitted " ,
2023-12-22 09:56:39 +05:30
str ( err . exception ) ,
)
def test_parsing_automation_workflow_airflow ( self ) :
"""
Test parsing of automation workflow for airflow
"""
config_dict = {
" id " : " 8b735b2c-194e-41a4-b383-96253f936293 " ,
" name " : " test-connection-Airflow-WhCTUSXJ " ,
" deleted " : False ,
" request " : {
" connection " : {
" config " : {
" type " : " Airflow " ,
" hostPort " : " https://localhost:8080 " ,
" connection " : {
" type " : " Mysql " ,
" scheme " : " mysql+pymysql " ,
" authType " : { " password " : " fernet:demo_password " } ,
" hostPort " : " mysql:3306 " ,
" username " : " admin@openmetadata.org " ,
" databaseName " : " airflow_db " ,
" supportsProfiler " : True ,
" supportsQueryComment " : True ,
" supportsDBTExtraction " : True ,
" supportsMetadataExtraction " : True ,
} ,
" numberOfStatus " : 10 ,
" supportsMetadataExtraction " : True ,
}
} ,
" serviceName " : " airflow_test_two " ,
" serviceType " : " Pipeline " ,
" connectionType " : " Airflow " ,
" secretsManagerProvider " : " db " ,
} ,
" version " : 0.1 ,
" updatedAt " : 1703157653864 ,
" updatedBy " : " admin " ,
" workflowType " : " TEST_CONNECTION " ,
" fullyQualifiedName " : " test-connection-Airflow-WhCTUSXJ " ,
}
self . assertIsInstance (
parse_automation_workflow_gracefully ( config_dict ) ,
AutomationWorkflow ,
)
config_dict_ko = {
" id " : " 8b735b2c-194e-41a4-b383-96253f936293 " ,
" name " : " test-connection-Airflow-WhCTUSXJ " ,
" deleted " : False ,
" request " : {
" connection " : {
" config " : {
" type " : " Airflow " ,
2024-06-05 21:18:37 +02:00
" hostPort " : " http:://localhost:8080 " ,
2023-12-22 09:56:39 +05:30
" connection " : {
" type " : " Mysql " ,
" scheme " : " mysql+pymysql " ,
" authType " : { " password " : " fernet:demo_password " } ,
" hostPort " : " mysql:3306 " ,
" username " : " admin@openmetadata.org " ,
" databaseName " : " airflow_db " ,
" supportsProfiler " : True ,
" supportsQueryComment " : True ,
" supportsDBTExtraction " : True ,
" supportsMetadataExtraction " : True ,
} ,
" numberOfStatus " : 10 ,
" supportsMetadataExtraction " : True ,
}
} ,
" serviceName " : " airflow_test_two " ,
" serviceType " : " Pipeline " ,
" connectionType " : " Airflow " ,
" secretsManagerProvider " : " db " ,
} ,
" version " : 0.1 ,
" updatedAt " : 1703157653864 ,
" updatedBy " : " admin " ,
" workflowType " : " TEST_CONNECTION " ,
" fullyQualifiedName " : " test-connection-Airflow-WhCTUSXJ " ,
}
with self . assertRaises ( ValidationError ) as err :
parse_automation_workflow_gracefully ( config_dict_ko )
self . assertIn (
2024-06-05 21:18:37 +02:00
" 1 validation error for AirflowConnection \n hostPort \n Input should be a valid URL " ,
2023-12-22 09:56:39 +05:30
str ( err . exception ) ,
)
config_dict_ko_2 = {
" id " : " 8b735b2c-194e-41a4-b383-96253f936293 " ,
" name " : " test-connection-Airflow-WhCTUSXJ " ,
" deleted " : False ,
" request " : {
" connection " : {
" config " : {
" type " : " Airflow " ,
" hostPort " : " https://localhost:8080 " ,
" connection " : {
" type " : " Mysql " ,
" scheme " : " mysql+pymysql " ,
" authType " : { " password " : " fernet:demo_password " } ,
" hostPort " : " mysql:3306 " ,
" usernam " : " admin@openmetadata.org " ,
" databaseName " : " airflow_db " ,
" supportsProfile " : True ,
" supportsQueryComment " : True ,
" supportsDBTExtraction " : True ,
" supportsMetadataExtraction " : True ,
} ,
" numberOfStatus " : 10 ,
" supportsMetadataExtraction " : True ,
}
} ,
" serviceName " : " airflow_test_two " ,
" serviceType " : " Pipeline " ,
" connectionType " : " Airflow " ,
" secretsManagerProvider " : " db " ,
} ,
" version " : 0.1 ,
" updatedAt " : 1703157653864 ,
" updatedBy " : " admin " ,
" workflowType " : " TEST_CONNECTION " ,
" fullyQualifiedName " : " test-connection-Airflow-WhCTUSXJ " ,
}
with self . assertRaises ( ValidationError ) as err :
parse_automation_workflow_gracefully ( config_dict_ko_2 )
self . assertIn (
2024-06-05 21:18:37 +02:00
" 3 validation errors for MysqlConnection \n username \n Field required " ,
2023-12-22 09:56:39 +05:30
str ( err . exception ) ,
)
def test_parsing_automation_workflow_athena ( self ) :
"""
Test parsing of automation workflow for airflow
"""
config_dict = {
" id " : " 850b194c-3d1b-4f6f-95df-83e3df5ccb24 " ,
" name " : " test-connection-Athena-EHnc3Ral " ,
" deleted " : False ,
" request " : {
" connection " : {
" config " : {
" type " : " Athena " ,
" scheme " : " awsathena+rest " ,
" awsConfig " : {
" awsRegion " : " us-east-2 " ,
" assumeRoleSessionName " : " OpenMetadataSession " ,
} ,
" workgroup " : " primary " ,
" s3StagingDir " : " s3://athena-postgres/output/ " ,
" supportsProfiler " : True ,
" supportsQueryComment " : True ,
" supportsDBTExtraction " : True ,
" supportsUsageExtraction " : True ,
" supportsLineageExtraction " : True ,
" supportsMetadataExtraction " : True ,
}
} ,
" serviceType " : " Database " ,
" connectionType " : " Athena " ,
" secretsManagerProvider " : " db " ,
} ,
" version " : 0.1 ,
" updatedAt " : 1703173676044 ,
" updatedBy " : " admin " ,
" workflowType " : " TEST_CONNECTION " ,
" fullyQualifiedName " : " test-connection-Athena-EHnc3Ral " ,
}
self . assertIsInstance (
parse_automation_workflow_gracefully ( config_dict ) ,
AutomationWorkflow ,
)
config_dict_ko = {
" id " : " 850b194c-3d1b-4f6f-95df-83e3df5ccb24 " ,
" name " : " test-connection-Athena-EHnc3Ral " ,
" deleted " : False ,
" request " : {
" connection " : {
" config " : {
" type " : " Athena " ,
" scheme " : " awsathena+rest " ,
" awsConfig " : {
" awsRegion " : " us-east-2 " ,
" assumeRoleSessionName " : " OpenMetadataSession " ,
} ,
" workgroup " : " primary " ,
" s3StagingDir " : " athena-postgres/output/ " ,
" supportsProfiler " : True ,
" supportsQueryComment " : True ,
" supportsDBTExtraction " : True ,
" supportsUsageExtraction " : True ,
" supportsLineageExtraction " : True ,
" supportsMetadataExtraction " : True ,
}
} ,
" serviceType " : " Database " ,
" connectionType " : " Athena " ,
" secretsManagerProvider " : " db " ,
} ,
" version " : 0.1 ,
" updatedAt " : 1703173676044 ,
" updatedBy " : " admin " ,
" workflowType " : " TEST_CONNECTION " ,
" fullyQualifiedName " : " test-connection-Athena-EHnc3Ral " ,
2022-07-18 18:50:27 +02:00
}
with self . assertRaises ( ValidationError ) as err :
2023-12-22 09:56:39 +05:30
parse_automation_workflow_gracefully ( config_dict_ko )
2022-07-18 18:50:27 +02:00
self . assertIn (
2024-06-05 21:18:37 +02:00
" 1 validation error for AthenaConnection \n s3StagingDir \n Input should be a valid URL " ,
2022-07-18 18:50:27 +02:00
str ( err . exception ) ,
)
2024-02-29 10:41:21 +05:30
def test_parsing_dbt_workflow_ok ( self ) :
"""
Test dbt workflow Config parsing OK
"""
config_dict = {
" source " : {
" type " : " dbt " ,
" serviceName " : " dbt_prod " ,
" sourceConfig " : {
" config " : {
" type " : " DBT " ,
" dbtConfigSource " : {
" dbtConfigType " : " local " ,
" dbtCatalogFilePath " : " /path/to/catalog.json " ,
" dbtManifestFilePath " : " /path/to/manifest.json " ,
" dbtRunResultsFilePath " : " /path/to/run_results.json " ,
} ,
" dbtUpdateDescriptions " : True ,
" includeTags " : True ,
" dbtClassificationName " : " dbtTags " ,
" databaseFilterPattern " : { " includes " : [ " test " ] } ,
" schemaFilterPattern " : {
" includes " : [ " test1 " ] ,
" excludes " : [ " .*schema.* " ] ,
} ,
" tableFilterPattern " : {
" includes " : [ " test3 " ] ,
" excludes " : [ " .*table_name.* " ] ,
} ,
}
} ,
} ,
" sink " : { " type " : " metadata-rest " , " config " : { } } ,
" workflowConfig " : {
" loggerLevel " : " DEBUG " ,
" openMetadataServerConfig " : {
" hostPort " : " http://localhost:8585/api " ,
" authProvider " : " openmetadata " ,
" securityConfig " : { " jwtToken " : " jwt_token " } ,
} ,
} ,
}
self . assertIsNotNone ( parse_workflow_config_gracefully ( config_dict ) )
def test_parsing_dbt_workflow_ko ( self ) :
"""
Test dbt workflow Config parsing OK
"""
config_dict_type_error_ko = {
" source " : {
" type " : " dbt " ,
" serviceName " : " dbt_prod " ,
" sourceConfig " : {
" config " : {
" type " : " DBT " ,
" dbtConfigSource " : {
" dbtConfigType " : " cloud " ,
" dbtCloudAuthToken " : " token " ,
" dbtCloudAccountId " : " ID " ,
" dbtCloudJobId " : " JOB ID " ,
} ,
" dbtUpdateDescriptions " : True ,
" includeTags " : True ,
" dbtClassificationName " : " dbtTags " ,
" databaseFilterPattern " : { " includes " : [ " test " ] } ,
" schemaFilterPattern " : {
" includes " : [ " test1 " ] ,
" excludes " : [ " .*schema.* " ] ,
} ,
" tableFilterPattern " : {
" includes " : [ " test3 " ] ,
" excludes " : [ " .*table_name.* " ] ,
} ,
}
} ,
} ,
" sink " : { " type " : " metadata-rest " , " config " : { } } ,
" workflowConfig " : {
" loggerLevel " : " DEBUG " ,
" openMetadataServerConfig " : {
" hostPort " : " http://localhost:8585/api " ,
" authProvider " : " openmetadata " ,
" securityConfig " : { " jwtToken " : " jwt_token " } ,
} ,
} ,
}
with self . assertRaises ( ParsingConfigurationError ) as err :
parse_workflow_config_gracefully ( config_dict_type_error_ko )
self . assertIn (
" We encountered an error parsing the configuration of your DbtCloudConfig. \n You might need to review your config based on the original cause of this failure: \n \t - Missing parameter ' dbtCloudUrl ' " ,
str ( err . exception ) ,
)
def test_parsing_dbt_pipeline_ko ( self ) :
"""
Test dbt workflow Config parsing OK
"""
config_dict_dbt_pipeline_ko = {
" source " : {
" type " : " dbt " ,
" serviceName " : " dbt_prod " ,
" sourceConfig " : {
" config " : {
" type " : " DBT " ,
" dbtConfigSource " : {
" dbtConfigType " : " cloud " ,
" dbtCloudAuthToken " : " token " ,
" dbtCloudAccountId " : " ID " ,
" dbtCloudJobId " : " JOB ID " ,
" dbtCloudUrl " : " https://clouddbt.com " ,
} ,
2024-06-05 21:18:37 +02:00
" extraParameter " : True ,
2024-02-29 10:41:21 +05:30
" includeTags " : True ,
" dbtClassificationName " : " dbtTags " ,
" databaseFilterPattern " : { " includes " : [ " test " ] } ,
" schemaFilterPattern " : {
" includes " : [ " test1 " ] ,
" excludes " : [ " .*schema.* " ] ,
} ,
" tableFilterPattern " : {
" includes " : [ " test3 " ] ,
" excludes " : [ " .*table_name.* " ] ,
} ,
}
} ,
} ,
" sink " : { " type " : " metadata-rest " , " config " : { } } ,
" workflowConfig " : {
" loggerLevel " : " DEBUG " ,
" openMetadataServerConfig " : {
" hostPort " : " http://localhost:8585/api " ,
" authProvider " : " openmetadata " ,
" securityConfig " : { " jwtToken " : " jwt_token " } ,
} ,
} ,
}
with self . assertRaises ( ParsingConfigurationError ) as err :
parse_workflow_config_gracefully ( config_dict_dbt_pipeline_ko )
self . assertIn (
2024-06-05 21:18:37 +02:00
" We encountered an error parsing the configuration of your DbtPipeline. \n You might need to review your config based on the original cause of this failure: \n \t - Extra parameter ' extraParameter ' " ,
2024-02-29 10:41:21 +05:30
str ( err . exception ) ,
)