airflow-test-connection-fixed (#5502)

* airflow-test-connection-fixed

* airbyte-test-connection-added
This commit is contained in:
Abhishek Pandey 2022-06-17 23:08:18 +05:30 committed by GitHub
parent 80abd72217
commit 196b023643
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 54 additions and 1 deletions

View File

@ -16,6 +16,9 @@
}, },
{ {
"$ref": "../../../entity/services/messagingService.json#/definitions/messagingConnection" "$ref": "../../../entity/services/messagingService.json#/definitions/messagingConnection"
},
{
"$ref": "../../../entity/services/pipelineService.json#/definitions/pipelineConnection"
} }
] ]
}, },

View File

@ -169,10 +169,11 @@ class AirflowSource(Source[CreatePipelineRequest]):
) )
for task in tasks for task in tasks
] ]
pipeline_status = PipelineStatus( pipeline_status = PipelineStatus(
taskStatus=task_statuses, taskStatus=task_statuses,
executionStatus=STATUS_MAP.get(dag._state, StatusType.Pending.value), executionStatus=STATUS_MAP.get(dag._state, StatusType.Pending.value),
executionDate=dag.start_date.timestamp(), executionDate=dag.execution_date.timestamp(),
) )
yield OMetaPipelineStatus( yield OMetaPipelineStatus(
pipeline_fqn=pipeline_fqn, pipeline_status=pipeline_status pipeline_fqn=pipeline_fqn, pipeline_status=pipeline_status

View File

@ -91,3 +91,8 @@ class DatalakeClient:
def __init__(self, client, config) -> None: def __init__(self, client, config) -> None:
self.client = client self.client = client
self.config = config self.config = config
class AirByteClient:
def __init__(self, client) -> None:
self.client = client

View File

@ -77,8 +77,15 @@ from metadata.generated.schema.entity.services.connections.database.snowflakeCon
from metadata.generated.schema.entity.services.connections.messaging.kafkaConnection import ( from metadata.generated.schema.entity.services.connections.messaging.kafkaConnection import (
KafkaConnection, KafkaConnection,
) )
from metadata.generated.schema.entity.services.connections.pipeline.airbyteConnection import (
AirbyteConnection,
)
from metadata.generated.schema.entity.services.connections.pipeline.airflowConnection import (
AirflowConnection,
)
from metadata.orm_profiler.orm.functions.conn_test import ConnTestFn from metadata.orm_profiler.orm.functions.conn_test import ConnTestFn
from metadata.utils.connection_clients import ( from metadata.utils.connection_clients import (
AirByteClient,
DatalakeClient, DatalakeClient,
DeltaLakeClient, DeltaLakeClient,
DynamoClient, DynamoClient,
@ -437,6 +444,43 @@ def _(connection: MetabaseClient) -> None:
) )
@test_connection.register
def _(connection: AirflowConnection) -> None:
try:
test_connection(connection.connection)
except Exception as err:
raise SourceConnectionException(
f"Unknown error connecting with {connection} - {err}."
)
@get_connection.register
def _(connection: AirflowConnection) -> None:
try:
return get_connection(connection.connection)
except Exception as err:
raise SourceConnectionException(
f"Unknown error connecting with {connection} - {err}."
)
@get_connection.register
def _(connection: AirbyteConnection, verbose: bool = False):
from metadata.utils.airbyte_client import AirbyteClient
return AirByteClient(AirbyteClient(connection))
@test_connection.register
def _(connection: AirByteClient) -> None:
try:
connection.client.list_workspaces()
except Exception as err:
raise SourceConnectionException(
f"Unknown error connecting with {connection} - {err}."
)
@get_connection.register @get_connection.register
def _(connection: RedashConnection, verbose: bool = False): def _(connection: RedashConnection, verbose: bool = False):