Add more context to airflow exceptions (#9441)

This commit is contained in:
Pere Miquel Brull 2022-12-21 11:01:54 +01:00 committed by GitHub
parent b9242c1390
commit bef34e45a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -71,10 +71,24 @@ logger = workflow_logger()
class InvalidServiceException(Exception): class InvalidServiceException(Exception):
"""
The service type we received is not supported
"""
class GetServiceException(Exception):
""" """
Exception to be thrown when couldn't fetch the service from server Exception to be thrown when couldn't fetch the service from server
""" """
def __init__(self, service_type: str, service_name: str):
self.message = (
f"Could not get service from type {service_type}. This means that the"
" OpenMetadata client running in the Airflow host had issues getting"
f" the service {service_name}. Validate your ingestion-bot authentication."
)
super().__init__(self.message)
class ClientInitializationError(Exception): class ClientInitializationError(Exception):
""" """
@ -100,7 +114,12 @@ def build_source(ingestion_pipeline: IngestionPipeline) -> WorkflowSource:
try: try:
metadata = OpenMetadata(config=ingestion_pipeline.openMetadataServerConnection) metadata = OpenMetadata(config=ingestion_pipeline.openMetadataServerConnection)
except Exception as exc: except Exception as exc:
raise ClientInitializationError(f"Failed to initialize the client: {exc}") raise ClientInitializationError(
f"Failed to initialize the OpenMetadata client due to: {exc}."
" Make sure that the Airflow host can reach the OpenMetadata"
f" server running at {ingestion_pipeline.openMetadataServerConnection.hostPort}"
" and that the client and server are in the same version."
)
service_type = ingestion_pipeline.service.type service_type = ingestion_pipeline.service.type
@ -109,9 +128,8 @@ def build_source(ingestion_pipeline: IngestionPipeline) -> WorkflowSource:
entity=TestSuite, fqn=ingestion_pipeline.service.name entity=TestSuite, fqn=ingestion_pipeline.service.name
) # check we are able to access OM server ) # check we are able to access OM server
if not service: if not service:
raise InvalidServiceException( raise GetServiceException(service_type, ingestion_pipeline.service.name)
f"Could not get service from type {service_type}"
)
return WorkflowSource( return WorkflowSource(
type=service_type, type=service_type,
serviceName=ingestion_pipeline.service.name, serviceName=ingestion_pipeline.service.name,
@ -177,7 +195,7 @@ def build_source(ingestion_pipeline: IngestionPipeline) -> WorkflowSource:
) )
if not service: if not service:
raise InvalidServiceException(f"Could not get service from type {service_type}") raise GetServiceException(service_type, ingestion_pipeline.service.name)
return WorkflowSource( return WorkflowSource(
type=service.serviceType.value.lower(), type=service.serviceType.value.lower(),