From 196b023643cbb8cd3be11294c7b40aa9aaf8e88b Mon Sep 17 00:00:00 2001 From: Abhishek Pandey Date: Fri, 17 Jun 2022 23:08:18 +0530 Subject: [PATCH] airflow-test-connection-fixed (#5502) * airflow-test-connection-fixed * airbyte-test-connection-added --- .../testServiceConnection.json | 3 ++ .../ingestion/source/pipeline/airflow.py | 3 +- .../src/metadata/utils/connection_clients.py | 5 +++ ingestion/src/metadata/utils/connections.py | 44 +++++++++++++++++++ 4 files changed, 54 insertions(+), 1 deletion(-) diff --git a/catalog-rest-service/src/main/resources/json/schema/api/services/ingestionPipelines/testServiceConnection.json b/catalog-rest-service/src/main/resources/json/schema/api/services/ingestionPipelines/testServiceConnection.json index 3e5a0274089..1b8c2548aaf 100644 --- a/catalog-rest-service/src/main/resources/json/schema/api/services/ingestionPipelines/testServiceConnection.json +++ b/catalog-rest-service/src/main/resources/json/schema/api/services/ingestionPipelines/testServiceConnection.json @@ -16,6 +16,9 @@ }, { "$ref": "../../../entity/services/messagingService.json#/definitions/messagingConnection" + }, + { + "$ref": "../../../entity/services/pipelineService.json#/definitions/pipelineConnection" } ] }, diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airflow.py b/ingestion/src/metadata/ingestion/source/pipeline/airflow.py index a3e49950f2b..741102b2cb8 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airflow.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airflow.py @@ -169,10 +169,11 @@ class AirflowSource(Source[CreatePipelineRequest]): ) for task in tasks ] + pipeline_status = PipelineStatus( taskStatus=task_statuses, executionStatus=STATUS_MAP.get(dag._state, StatusType.Pending.value), - executionDate=dag.start_date.timestamp(), + executionDate=dag.execution_date.timestamp(), ) yield OMetaPipelineStatus( pipeline_fqn=pipeline_fqn, pipeline_status=pipeline_status diff --git a/ingestion/src/metadata/utils/connection_clients.py b/ingestion/src/metadata/utils/connection_clients.py index 31eaf3838ec..c85adad11c5 100644 --- a/ingestion/src/metadata/utils/connection_clients.py +++ b/ingestion/src/metadata/utils/connection_clients.py @@ -91,3 +91,8 @@ class DatalakeClient: def __init__(self, client, config) -> None: self.client = client self.config = config + + +class AirByteClient: + def __init__(self, client) -> None: + self.client = client diff --git a/ingestion/src/metadata/utils/connections.py b/ingestion/src/metadata/utils/connections.py index a070af74d5d..f4372a3cc91 100644 --- a/ingestion/src/metadata/utils/connections.py +++ b/ingestion/src/metadata/utils/connections.py @@ -77,8 +77,15 @@ from metadata.generated.schema.entity.services.connections.database.snowflakeCon from metadata.generated.schema.entity.services.connections.messaging.kafkaConnection import ( KafkaConnection, ) +from metadata.generated.schema.entity.services.connections.pipeline.airbyteConnection import ( + AirbyteConnection, +) +from metadata.generated.schema.entity.services.connections.pipeline.airflowConnection import ( + AirflowConnection, +) from metadata.orm_profiler.orm.functions.conn_test import ConnTestFn from metadata.utils.connection_clients import ( + AirByteClient, DatalakeClient, DeltaLakeClient, DynamoClient, @@ -437,6 +444,43 @@ def _(connection: MetabaseClient) -> None: ) +@test_connection.register +def _(connection: AirflowConnection) -> None: + try: + test_connection(connection.connection) + except Exception as err: + raise SourceConnectionException( + f"Unknown error connecting with {connection} - {err}." + ) + + +@get_connection.register +def _(connection: AirflowConnection) -> None: + try: + return get_connection(connection.connection) + except Exception as err: + raise SourceConnectionException( + f"Unknown error connecting with {connection} - {err}." + ) + + +@get_connection.register +def _(connection: AirbyteConnection, verbose: bool = False): + from metadata.utils.airbyte_client import AirbyteClient + + return AirByteClient(AirbyteClient(connection)) + + +@test_connection.register +def _(connection: AirByteClient) -> None: + try: + connection.client.list_workspaces() + except Exception as err: + raise SourceConnectionException( + f"Unknown error connecting with {connection} - {err}." + ) + + @get_connection.register def _(connection: RedashConnection, verbose: bool = False):