diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py index 7fc3eaf8dd1..71185f4243f 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py @@ -71,10 +71,24 @@ logger = workflow_logger() class InvalidServiceException(Exception): + """ + The service type we received is not supported + """ + + +class GetServiceException(Exception): """ Exception to be thrown when couldn't fetch the service from server """ + def __init__(self, service_type: str, service_name: str): + self.message = ( + f"Could not get service from type {service_type}. This means that the" + " OpenMetadata client running in the Airflow host had issues getting" + f" the service {service_name}. Validate your ingestion-bot authentication." + ) + super().__init__(self.message) + class ClientInitializationError(Exception): """ @@ -100,7 +114,12 @@ def build_source(ingestion_pipeline: IngestionPipeline) -> WorkflowSource: try: metadata = OpenMetadata(config=ingestion_pipeline.openMetadataServerConnection) except Exception as exc: - raise ClientInitializationError(f"Failed to initialize the client: {exc}") + raise ClientInitializationError( + f"Failed to initialize the OpenMetadata client due to: {exc}." + " Make sure that the Airflow host can reach the OpenMetadata" + f" server running at {ingestion_pipeline.openMetadataServerConnection.hostPort}" + " and that the client and server are in the same version." + ) service_type = ingestion_pipeline.service.type @@ -109,9 +128,8 @@ def build_source(ingestion_pipeline: IngestionPipeline) -> WorkflowSource: entity=TestSuite, fqn=ingestion_pipeline.service.name ) # check we are able to access OM server if not service: - raise InvalidServiceException( - f"Could not get service from type {service_type}" - ) + raise GetServiceException(service_type, ingestion_pipeline.service.name) + return WorkflowSource( type=service_type, serviceName=ingestion_pipeline.service.name, @@ -177,7 +195,7 @@ def build_source(ingestion_pipeline: IngestionPipeline) -> WorkflowSource: ) if not service: - raise InvalidServiceException(f"Could not get service from type {service_type}") + raise GetServiceException(service_type, ingestion_pipeline.service.name) return WorkflowSource( type=service.serviceType.value.lower(),