From 51a275c481a658f0d92ef88e20648a5002ba97c6 Mon Sep 17 00:00:00 2001 From: Mayur Singal <39544459+ulixius9@users.noreply.github.com> Date: Mon, 13 Jun 2022 10:15:25 +0530 Subject: [PATCH] Fix #1121: Added Airbyte Source (#5372) Fix #1121: Added Airbyte Source (#5372) --- .../pipeline/airbyteConnection.json | 35 +++ .../entity/services/pipelineService.json | 8 +- ingestion/examples/workflows/airbyte.yaml | 17 ++ .../src/metadata/ingestion/api/parser.py | 6 + .../ingestion/source/pipeline/airbyte.py | 276 ++++++++++++++++++ .../src/metadata/utils/airbyte_client.py | 82 ++++++ 6 files changed, 423 insertions(+), 1 deletion(-) create mode 100644 catalog-rest-service/src/main/resources/json/schema/entity/services/connections/pipeline/airbyteConnection.json create mode 100644 ingestion/examples/workflows/airbyte.yaml create mode 100644 ingestion/src/metadata/ingestion/source/pipeline/airbyte.py create mode 100644 ingestion/src/metadata/utils/airbyte_client.py diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/pipeline/airbyteConnection.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/pipeline/airbyteConnection.json new file mode 100644 index 00000000000..f6e702a3011 --- /dev/null +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/pipeline/airbyteConnection.json @@ -0,0 +1,35 @@ +{ + "$id": "https://open-metadata.org/schema/entity/services/connections/pipeline/airbyteConnection.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "AirbyteConnection", + "description": "Airbyte Metadata Database Connection Config", + "type": "object", + "javaType": "org.openmetadata.catalog.services.connections.pipeline.AirbyteConnection", + "definitions": { + "AirbyteType": { + "description": "Service type.", + "type": "string", + "enum": ["Airbyte"], + "default": "Airbyte" + } + }, + "properties": { + "type": { + "title": "Service Type", + "description": "Service Type", + "$ref": "#/definitions/AirbyteType", + "default": "Airbyte" + }, + "hostPort": { + "description": "Pipeline Service Management/UI URL.", + "type": "string", + "format": "uri" + }, + "supportsMetadataExtraction": { + "title": "Supports Metadata Extraction", + "$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction" + } + }, + "additionalProperties": false, + "required": ["hostPort"] +} diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/pipelineService.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/pipelineService.json index d56aa9fe925..477779e6223 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/pipelineService.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/pipelineService.json @@ -10,7 +10,7 @@ "pipelineServiceType": { "description": "Type of pipeline service - Airflow or Prefect.", "type": "string", - "enum": ["Airflow", "Prefect", "Glue", "Generic"], + "enum": ["Airflow", "Prefect", "Glue", "Generic", "Airbyte"], "javaEnums": [ { "name": "Airflow" @@ -23,6 +23,9 @@ }, { "name": "Generic" + }, + { + "name": "Airbyte" } ] }, @@ -38,6 +41,9 @@ }, { "$ref": "./connections/pipeline/glueConnection.json" + }, + { + "$ref": "./connections/pipeline/airbyteConnection.json" } ] } diff --git a/ingestion/examples/workflows/airbyte.yaml b/ingestion/examples/workflows/airbyte.yaml new file mode 100644 index 00000000000..2ca3a350ab1 --- /dev/null +++ b/ingestion/examples/workflows/airbyte.yaml @@ -0,0 +1,17 @@ +source: + type: airbyte + serviceName: airbyte_source1 + serviceConnection: + config: + type: Airbyte + hostPort: http://localhost:8000 + sourceConfig: + config: + type: PipelineMetadata +sink: + type: metadata-rest + config: {} +workflowConfig: + openMetadataServerConfig: + hostPort: http://localhost:8585/api + authProvider: no-auth diff --git a/ingestion/src/metadata/ingestion/api/parser.py b/ingestion/src/metadata/ingestion/api/parser.py index 128a87ce5ca..ffcc0e7b20a 100644 --- a/ingestion/src/metadata/ingestion/api/parser.py +++ b/ingestion/src/metadata/ingestion/api/parser.py @@ -32,6 +32,10 @@ from metadata.generated.schema.entity.services.metadataService import ( MetadataConnection, MetadataServiceType, ) +from metadata.generated.schema.entity.services.pipelineService import ( + PipelineConnection, + PipelineServiceType, +) from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataWorkflowConfig, WorkflowConfig, @@ -65,6 +69,8 @@ def get_service_type( return MessagingConnection if source_type in MetadataServiceType.__members__: return MetadataConnection + if source_type in PipelineServiceType.__members__: + return PipelineConnection raise ValueError(f"Cannot find the service type of {source_type}") diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airbyte.py b/ingestion/src/metadata/ingestion/source/pipeline/airbyte.py new file mode 100644 index 00000000000..82ef93534ad --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/pipeline/airbyte.py @@ -0,0 +1,276 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Airbyte source to extract metadata +""" +import traceback +from dataclasses import dataclass, field +from typing import Iterable, List, Optional + +from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest +from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest +from metadata.generated.schema.entity.data.pipeline import ( + Pipeline, + PipelineStatus, + StatusType, + Task, + TaskStatus, +) +from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.generated.schema.entity.services.connections.pipeline.airbyteConnection import ( + AirbyteConnection, +) +from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.generated.schema.entity.services.pipelineService import PipelineService +from metadata.generated.schema.metadataIngestion.pipelineServiceMetadataPipeline import ( + PipelineServiceMetadataPipeline, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) +from metadata.generated.schema.type.entityLineage import EntitiesEdge +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.api.common import Entity +from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus +from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.utils import fqn +from metadata.utils.airbyte_client import AirbyteClient +from metadata.utils.filters import filter_by_pipeline +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() + + +@dataclass +class AirbyteSourceStatus(SourceStatus): + pipelines_scanned: List[str] = field(default_factory=list) + filtered: List[str] = field(default_factory=list) + + def pipeline_scanned(self, topic: str) -> None: + self.pipelines_scanned.append(topic) + + def dropped(self, topic: str) -> None: + self.filtered.append(topic) + + +STATUS_MAP = { + "cancelled": StatusType.Failed, + "succeeded": StatusType.Successful, + "failed": StatusType.Failed, + "running": StatusType.Pending, + "incomplete": StatusType.Failed, + "pending": StatusType.Pending, +} + + +class AirbyteSource(Source[CreatePipelineRequest]): + """ + Implements the necessary methods ot extract + Pipeline metadata from Airflow's metadata db + """ + + config: WorkflowSource + report: AirbyteSourceStatus + + def __init__( + self, + config: WorkflowSource, + metadata_config: OpenMetadataConnection, + ): + super().__init__() + self.config = config + self.source_config: PipelineServiceMetadataPipeline = ( + self.config.sourceConfig.config + ) + self.service_connection = self.config.serviceConnection.__root__.config + self.metadata_config = metadata_config + self.metadata = OpenMetadata(self.metadata_config) + self.status = AirbyteSourceStatus() + self.service: PipelineService = self.metadata.get_service_or_create( + entity=PipelineService, config=config + ) + + self.client = AirbyteClient(self.service_connection) + + @classmethod + def create(cls, config_dict, metadata_config: OpenMetadataConnection): + config: WorkflowSource = WorkflowSource.parse_obj(config_dict) + connection: AirbyteConnection = config.serviceConnection.__root__.config + if not isinstance(connection, AirbyteConnection): + raise InvalidSourceException( + f"Expected AirbyteConnection, but got {connection}" + ) + return cls(config, metadata_config) + + def prepare(self): + pass + + def get_connections_jobs(self, connection: dict, connection_url: str): + """ + Returns the list of tasks linked to connection + """ + return [ + Task( + name=job["job"]["id"], + displayName=job["job"]["id"], + description="", + taskUrl=f"{connection_url}/status", + taskType=job["job"]["configType"], + ) + for job in self.client.list_jobs(connection.get("connectionId")) + if job + ] + + def fetch_pipeline( + self, connection: dict, workspace: dict + ) -> Iterable[CreatePipelineRequest]: + """ + Convert a Connection into a Pipeline Entity + :param connection: connection object from airbyte + :param connection: workspace object from airbyte + :return: Create Pipeline request with tasks + """ + connection_url = f"/workspaces/{workspace.get('workspaceId')}/connections/{connection.get('connectionId')}" + yield CreatePipelineRequest( + name=connection.get("connectionId"), + displayName=connection.get("name"), + description="", + pipelineUrl=connection_url, + tasks=self.get_connections_jobs(connection, connection_url), + service=EntityReference(id=self.service.id, type="pipelineService"), + ) + + def fetch_pipeline_status( + self, connection: dict, pipeline_fqn: str + ) -> OMetaPipelineStatus: + """ + Method to get task & pipeline status + """ + task_status = [ + TaskStatus( + name=job["job"]["id"], + executionStatus=STATUS_MAP.get( + job["job"]["status"].lower(), StatusType.Pending + ).value, + ) + for job in self.client.list_jobs(connection.get("connectionId")) + ] + pipeline_status = PipelineStatus(taskStatus=task_status) + yield OMetaPipelineStatus( + pipeline_fqn=pipeline_fqn, pipeline_status=pipeline_status + ) + + def fetch_lineage( + self, connection: dict, pipeline_entity: Pipeline + ) -> Optional[Iterable[AddLineageRequest]]: + """ + Parse all the stream available in the connection and create a lineage between them + :param connection: connection object from airbyte + :param pipeline_entity: Pipeline we just ingested + :return: Lineage from inlets and outlets + """ + source_connection = self.client.get_source(connection.get("sourceId")) + destination_connection = self.client.get_destination( + connection.get("destinationId") + ) + source_service = self.metadata.get_by_name( + entity=DatabaseService, fqn=source_connection.get("name") + ) + destination_service = self.metadata.get_by_name( + entity=DatabaseService, fqn=destination_connection.get("name") + ) + if not source_service or not destination_service: + return + + for task in connection.get("syncCatalog", {}).get("streams") or []: + stream = task.get("stream") + from_fqn = fqn.build( + self.metadata, + Table, + table_name=stream.get("name"), + database_name=None, + schema_name=stream.get("namespace"), + service_name=source_connection.get("name"), + ) + + to_fqn = fqn.build( + self.metadata, + Table, + table_name=stream.get("name"), + database_name=None, + schema_name=stream.get("namespace"), + service_name=destination_connection.get("name"), + ) + + if not from_fqn and not to_fqn: + continue + + from_entity = self.metadata.get_by_name(entity=Table, fqn=from_fqn) + to_entity = self.metadata.get_by_name(entity=Table, fqn=to_fqn) + yield AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference(id=from_entity.id, type="table"), + toEntity=EntityReference(id=pipeline_entity.id, type="pipeline"), + ) + ) + yield AddLineageRequest( + edge=EntitiesEdge( + toEntity=EntityReference(id=to_entity.id, type="table"), + fromEntity=EntityReference(id=pipeline_entity.id, type="pipeline"), + ) + ) + + def next_record(self) -> Iterable[Entity]: + """ + Extract metadata information to create Pipelines with Tasks + """ + for workspace in self.client.list_workspaces(): + for connection in self.client.list_connections( + workflow_id=workspace.get("workspaceId") + ): + try: + if filter_by_pipeline( + self.source_config.pipelineFilterPattern, + connection.get("connectionId"), + ): + continue + yield from self.fetch_pipeline(connection, workspace) + pipeline_fqn = fqn.build( + self.metadata, + entity_type=Pipeline, + service_name=self.service.name.__root__, + pipeline_name=connection.get("connectionId"), + ) + yield from self.fetch_pipeline_status(connection, pipeline_fqn) + if self.source_config.includeLineage: + pipeline_entity: Pipeline = self.metadata.get_by_name( + entity=Pipeline, + fqn=pipeline_fqn, + ) + yield from self.fetch_lineage(connection, pipeline_entity) or [] + + except Exception as err: + logger.error(repr(err)) + logger.debug(traceback.format_exc()) + self.status.failure(connection.get("connectionId"), repr(err)) + + def get_status(self): + return self.status + + def close(self): + pass + + def test_connection(self) -> None: + pass diff --git a/ingestion/src/metadata/utils/airbyte_client.py b/ingestion/src/metadata/utils/airbyte_client.py new file mode 100644 index 00000000000..939ad16e523 --- /dev/null +++ b/ingestion/src/metadata/utils/airbyte_client.py @@ -0,0 +1,82 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Client to interact with airbyte apis +""" +import json +from typing import List + +from metadata.generated.schema.entity.services.connections.pipeline.airbyteConnection import ( + AirbyteConnection, +) +from metadata.ingestion.ometa.client import REST, APIError, ClientConfig + + +class AirbyteClient: + def __init__(self, config: AirbyteConnection): + self.config = config + client_config: ClientConfig = ClientConfig( + base_url=self.config.hostPort, + api_version="api/v1", + auth_header="Authorization", + auth_token=lambda: ("no_token", 0), + ) + self.client = REST(client_config) + + def list_workspaces(self) -> List[dict]: + """ + Method returns the list of workflows + an airbyte instance can contain multiple workflows + """ + response = self.client.post(f"/workspaces/list") + if response.get("exceptionStack"): + raise APIError(response["message"]) + return response.get("workspaces") + + def list_connections(self, workflow_id: str) -> List[dict]: + """ + Method returns the list all of connections of workflow + """ + data = {"workspaceId": workflow_id} + response = self.client.post(f"/connections/list", data=json.dumps(data)) + if response.get("exceptionStack"): + raise APIError(response["message"]) + return response.get("connections") + + def list_jobs(self, connection_id: str) -> List[dict]: + """ + Method returns the list all of jobs of a connection + """ + data = {"configId": connection_id, "configTypes": ["sync", "reset_connection"]} + response = self.client.post(f"/jobs/list", data=json.dumps(data)) + if response.get("exceptionStack"): + raise APIError(response["message"]) + return response.get("jobs") + + def get_source(self, source_id: str) -> dict: + """ + Method returns source details + """ + data = {"sourceId": source_id} + response = self.client.post(f"/sources/get", data=json.dumps(data)) + if response.get("exceptionStack"): + raise APIError(response["message"]) + return response + + def get_destination(self, destination_id: str) -> dict: + """ + Method returns destination details + """ + data = {"destinationId": destination_id} + response = self.client.post(f"/destinations/get", data=json.dumps(data)) + if response.get("exceptionStack"): + raise APIError(response["message"]) + return response