mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-11-07 22:44:08 +00:00
* #13934 - Update test connection no response error msg * #13934 - Update test connection no response error msg
This commit is contained in:
parent
0b840b4f8b
commit
0332fb9fe5
@ -114,3 +114,47 @@ class TestConnectionAutomationTest(TestCase):
|
|||||||
entity_id=str(automation_workflow.id.__root__),
|
entity_id=str(automation_workflow.id.__root__),
|
||||||
hard_delete=True,
|
hard_delete=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def test_connection_workflow_ko(self):
|
||||||
|
"""Test connection that will fail"""
|
||||||
|
wrong_service_connection = MysqlConnection(
|
||||||
|
username="openmetadata_user",
|
||||||
|
authType=BasicAuth(password="openmetadata_password"),
|
||||||
|
hostPort="localhost:8585", # There's something running there, but it's not MySQL
|
||||||
|
databaseSchema="openmetadata_db",
|
||||||
|
)
|
||||||
|
|
||||||
|
wrong_workflow_request = CreateWorkflowRequest(
|
||||||
|
name="test-connection-mysql-bad",
|
||||||
|
description="description",
|
||||||
|
workflowType=WorkflowType.TEST_CONNECTION,
|
||||||
|
request=TestServiceConnectionRequest(
|
||||||
|
serviceType=ServiceType.Database,
|
||||||
|
connectionType=MySQLType.Mysql.value,
|
||||||
|
connection=DatabaseConnection(
|
||||||
|
config=wrong_service_connection,
|
||||||
|
),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
automation_workflow: Workflow = self.metadata.create_or_update(
|
||||||
|
data=wrong_workflow_request
|
||||||
|
)
|
||||||
|
engine: Engine = get_connection(wrong_service_connection)
|
||||||
|
|
||||||
|
test_connection_fn = get_test_connection_fn(wrong_service_connection)
|
||||||
|
test_connection_fn(
|
||||||
|
self.metadata, engine, wrong_service_connection, automation_workflow
|
||||||
|
)
|
||||||
|
|
||||||
|
final_workflow: Workflow = self.metadata.get_by_name(
|
||||||
|
entity=Workflow, fqn="test-connection-mysql-bad"
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual(final_workflow.response.status, StatusType.Failed)
|
||||||
|
|
||||||
|
self.metadata.delete(
|
||||||
|
entity=Workflow,
|
||||||
|
entity_id=str(automation_workflow.id.__root__),
|
||||||
|
hard_delete=True,
|
||||||
|
)
|
||||||
|
|||||||
@ -61,6 +61,7 @@ def get_fn(blueprint: Blueprint) -> Callable:
|
|||||||
automation_workflow.openMetadataServerConnection.secretsManagerLoader,
|
automation_workflow.openMetadataServerConnection.secretsManagerLoader,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Should this be triggered async?
|
||||||
execute(automation_workflow)
|
execute(automation_workflow)
|
||||||
|
|
||||||
return ApiResponse.success(
|
return ApiResponse.success(
|
||||||
|
|||||||
@ -132,11 +132,11 @@ public class AirflowRESTClient extends PipelineServiceClient {
|
|||||||
}
|
}
|
||||||
} catch (IOException | URISyntaxException e) {
|
} catch (IOException | URISyntaxException e) {
|
||||||
throw IngestionPipelineDeploymentException.byMessage(
|
throw IngestionPipelineDeploymentException.byMessage(
|
||||||
ingestionPipeline.getName(), DEPLOYEMENT_ERROR, e.getMessage());
|
ingestionPipeline.getName(), DEPLOYMENT_ERROR, e.getMessage());
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
throw IngestionPipelineDeploymentException.byMessage(
|
throw IngestionPipelineDeploymentException.byMessage(
|
||||||
ingestionPipeline.getName(), DEPLOYEMENT_ERROR, e.getMessage());
|
ingestionPipeline.getName(), DEPLOYMENT_ERROR, e.getMessage());
|
||||||
}
|
}
|
||||||
throw new PipelineServiceClientException(
|
throw new PipelineServiceClientException(
|
||||||
String.format(
|
String.format(
|
||||||
@ -329,8 +329,13 @@ public class AirflowRESTClient extends PipelineServiceClient {
|
|||||||
return getResponse(200, response.body());
|
return getResponse(200, response.body());
|
||||||
}
|
}
|
||||||
} catch (IOException | URISyntaxException e) {
|
} catch (IOException | URISyntaxException e) {
|
||||||
|
// We can end up here if the test connection is not sending back anything after the POST
|
||||||
|
// request
|
||||||
|
// due to the connection to the source service not being properly resolved.
|
||||||
throw IngestionPipelineDeploymentException.byMessage(
|
throw IngestionPipelineDeploymentException.byMessage(
|
||||||
workflow.getName(), TRIGGER_ERROR, e.getMessage());
|
workflow.getName(),
|
||||||
|
TRIGGER_ERROR,
|
||||||
|
"No response from the test connection. Make sure your service is reachable and accepting connections");
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
throw IngestionPipelineDeploymentException.byMessage(
|
throw IngestionPipelineDeploymentException.byMessage(
|
||||||
@ -368,11 +373,11 @@ public class AirflowRESTClient extends PipelineServiceClient {
|
|||||||
}
|
}
|
||||||
} catch (IOException | URISyntaxException e) {
|
} catch (IOException | URISyntaxException e) {
|
||||||
throw IngestionPipelineDeploymentException.byMessage(
|
throw IngestionPipelineDeploymentException.byMessage(
|
||||||
workflowPayload, DEPLOYEMENT_ERROR, e.getMessage());
|
workflowPayload, DEPLOYMENT_ERROR, e.getMessage());
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
throw IngestionPipelineDeploymentException.byMessage(
|
throw IngestionPipelineDeploymentException.byMessage(
|
||||||
workflowPayload, DEPLOYEMENT_ERROR, e.getMessage());
|
workflowPayload, DEPLOYMENT_ERROR, e.getMessage());
|
||||||
}
|
}
|
||||||
throw new PipelineServiceClientException(
|
throw new PipelineServiceClientException(
|
||||||
String.format(
|
String.format(
|
||||||
|
|||||||
@ -78,7 +78,7 @@ public abstract class PipelineServiceClient {
|
|||||||
public static final String APP_TRIGGER = "run_application";
|
public static final String APP_TRIGGER = "run_application";
|
||||||
public static final String APP_VALIDATE = "validate_registration";
|
public static final String APP_VALIDATE = "validate_registration";
|
||||||
|
|
||||||
public static final String DEPLOYEMENT_ERROR = "DEPLOYMENT_ERROR";
|
public static final String DEPLOYMENT_ERROR = "DEPLOYMENT_ERROR";
|
||||||
public static final String TRIGGER_ERROR = "TRIGGER_ERROR";
|
public static final String TRIGGER_ERROR = "TRIGGER_ERROR";
|
||||||
public static final Map<String, String> TYPE_TO_TASK =
|
public static final Map<String, String> TYPE_TO_TASK =
|
||||||
Map.of(
|
Map.of(
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user