Fix #1121: Added Airbyte Source (#5372)

Fix #1121: Added Airbyte Source (#5372)
This commit is contained in:
Mayur Singal 2022-06-13 10:15:25 +05:30 committed by GitHub
parent ce149c86be
commit 51a275c481
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 423 additions and 1 deletions

View File

@ -0,0 +1,35 @@
{
"$id": "https://open-metadata.org/schema/entity/services/connections/pipeline/airbyteConnection.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "AirbyteConnection",
"description": "Airbyte Metadata Database Connection Config",
"type": "object",
"javaType": "org.openmetadata.catalog.services.connections.pipeline.AirbyteConnection",
"definitions": {
"AirbyteType": {
"description": "Service type.",
"type": "string",
"enum": ["Airbyte"],
"default": "Airbyte"
}
},
"properties": {
"type": {
"title": "Service Type",
"description": "Service Type",
"$ref": "#/definitions/AirbyteType",
"default": "Airbyte"
},
"hostPort": {
"description": "Pipeline Service Management/UI URL.",
"type": "string",
"format": "uri"
},
"supportsMetadataExtraction": {
"title": "Supports Metadata Extraction",
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
}
},
"additionalProperties": false,
"required": ["hostPort"]
}

View File

@ -10,7 +10,7 @@
"pipelineServiceType": { "pipelineServiceType": {
"description": "Type of pipeline service - Airflow or Prefect.", "description": "Type of pipeline service - Airflow or Prefect.",
"type": "string", "type": "string",
"enum": ["Airflow", "Prefect", "Glue", "Generic"], "enum": ["Airflow", "Prefect", "Glue", "Generic", "Airbyte"],
"javaEnums": [ "javaEnums": [
{ {
"name": "Airflow" "name": "Airflow"
@ -23,6 +23,9 @@
}, },
{ {
"name": "Generic" "name": "Generic"
},
{
"name": "Airbyte"
} }
] ]
}, },
@ -38,6 +41,9 @@
}, },
{ {
"$ref": "./connections/pipeline/glueConnection.json" "$ref": "./connections/pipeline/glueConnection.json"
},
{
"$ref": "./connections/pipeline/airbyteConnection.json"
} }
] ]
} }

View File

@ -0,0 +1,17 @@
source:
type: airbyte
serviceName: airbyte_source1
serviceConnection:
config:
type: Airbyte
hostPort: http://localhost:8000
sourceConfig:
config:
type: PipelineMetadata
sink:
type: metadata-rest
config: {}
workflowConfig:
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: no-auth

View File

@ -32,6 +32,10 @@ from metadata.generated.schema.entity.services.metadataService import (
MetadataConnection, MetadataConnection,
MetadataServiceType, MetadataServiceType,
) )
from metadata.generated.schema.entity.services.pipelineService import (
PipelineConnection,
PipelineServiceType,
)
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig, OpenMetadataWorkflowConfig,
WorkflowConfig, WorkflowConfig,
@ -65,6 +69,8 @@ def get_service_type(
return MessagingConnection return MessagingConnection
if source_type in MetadataServiceType.__members__: if source_type in MetadataServiceType.__members__:
return MetadataConnection return MetadataConnection
if source_type in PipelineServiceType.__members__:
return PipelineConnection
raise ValueError(f"Cannot find the service type of {source_type}") raise ValueError(f"Cannot find the service type of {source_type}")

View File

@ -0,0 +1,276 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Airbyte source to extract metadata
"""
import traceback
from dataclasses import dataclass, field
from typing import Iterable, List, Optional
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.pipeline import (
Pipeline,
PipelineStatus,
StatusType,
Task,
TaskStatus,
)
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.connections.pipeline.airbyteConnection import (
AirbyteConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.services.pipelineService import PipelineService
from metadata.generated.schema.metadataIngestion.pipelineServiceMetadataPipeline import (
PipelineServiceMetadataPipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils import fqn
from metadata.utils.airbyte_client import AirbyteClient
from metadata.utils.filters import filter_by_pipeline
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
@dataclass
class AirbyteSourceStatus(SourceStatus):
pipelines_scanned: List[str] = field(default_factory=list)
filtered: List[str] = field(default_factory=list)
def pipeline_scanned(self, topic: str) -> None:
self.pipelines_scanned.append(topic)
def dropped(self, topic: str) -> None:
self.filtered.append(topic)
STATUS_MAP = {
"cancelled": StatusType.Failed,
"succeeded": StatusType.Successful,
"failed": StatusType.Failed,
"running": StatusType.Pending,
"incomplete": StatusType.Failed,
"pending": StatusType.Pending,
}
class AirbyteSource(Source[CreatePipelineRequest]):
"""
Implements the necessary methods ot extract
Pipeline metadata from Airflow's metadata db
"""
config: WorkflowSource
report: AirbyteSourceStatus
def __init__(
self,
config: WorkflowSource,
metadata_config: OpenMetadataConnection,
):
super().__init__()
self.config = config
self.source_config: PipelineServiceMetadataPipeline = (
self.config.sourceConfig.config
)
self.service_connection = self.config.serviceConnection.__root__.config
self.metadata_config = metadata_config
self.metadata = OpenMetadata(self.metadata_config)
self.status = AirbyteSourceStatus()
self.service: PipelineService = self.metadata.get_service_or_create(
entity=PipelineService, config=config
)
self.client = AirbyteClient(self.service_connection)
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: AirbyteConnection = config.serviceConnection.__root__.config
if not isinstance(connection, AirbyteConnection):
raise InvalidSourceException(
f"Expected AirbyteConnection, but got {connection}"
)
return cls(config, metadata_config)
def prepare(self):
pass
def get_connections_jobs(self, connection: dict, connection_url: str):
"""
Returns the list of tasks linked to connection
"""
return [
Task(
name=job["job"]["id"],
displayName=job["job"]["id"],
description="",
taskUrl=f"{connection_url}/status",
taskType=job["job"]["configType"],
)
for job in self.client.list_jobs(connection.get("connectionId"))
if job
]
def fetch_pipeline(
self, connection: dict, workspace: dict
) -> Iterable[CreatePipelineRequest]:
"""
Convert a Connection into a Pipeline Entity
:param connection: connection object from airbyte
:param connection: workspace object from airbyte
:return: Create Pipeline request with tasks
"""
connection_url = f"/workspaces/{workspace.get('workspaceId')}/connections/{connection.get('connectionId')}"
yield CreatePipelineRequest(
name=connection.get("connectionId"),
displayName=connection.get("name"),
description="",
pipelineUrl=connection_url,
tasks=self.get_connections_jobs(connection, connection_url),
service=EntityReference(id=self.service.id, type="pipelineService"),
)
def fetch_pipeline_status(
self, connection: dict, pipeline_fqn: str
) -> OMetaPipelineStatus:
"""
Method to get task & pipeline status
"""
task_status = [
TaskStatus(
name=job["job"]["id"],
executionStatus=STATUS_MAP.get(
job["job"]["status"].lower(), StatusType.Pending
).value,
)
for job in self.client.list_jobs(connection.get("connectionId"))
]
pipeline_status = PipelineStatus(taskStatus=task_status)
yield OMetaPipelineStatus(
pipeline_fqn=pipeline_fqn, pipeline_status=pipeline_status
)
def fetch_lineage(
self, connection: dict, pipeline_entity: Pipeline
) -> Optional[Iterable[AddLineageRequest]]:
"""
Parse all the stream available in the connection and create a lineage between them
:param connection: connection object from airbyte
:param pipeline_entity: Pipeline we just ingested
:return: Lineage from inlets and outlets
"""
source_connection = self.client.get_source(connection.get("sourceId"))
destination_connection = self.client.get_destination(
connection.get("destinationId")
)
source_service = self.metadata.get_by_name(
entity=DatabaseService, fqn=source_connection.get("name")
)
destination_service = self.metadata.get_by_name(
entity=DatabaseService, fqn=destination_connection.get("name")
)
if not source_service or not destination_service:
return
for task in connection.get("syncCatalog", {}).get("streams") or []:
stream = task.get("stream")
from_fqn = fqn.build(
self.metadata,
Table,
table_name=stream.get("name"),
database_name=None,
schema_name=stream.get("namespace"),
service_name=source_connection.get("name"),
)
to_fqn = fqn.build(
self.metadata,
Table,
table_name=stream.get("name"),
database_name=None,
schema_name=stream.get("namespace"),
service_name=destination_connection.get("name"),
)
if not from_fqn and not to_fqn:
continue
from_entity = self.metadata.get_by_name(entity=Table, fqn=from_fqn)
to_entity = self.metadata.get_by_name(entity=Table, fqn=to_fqn)
yield AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(id=from_entity.id, type="table"),
toEntity=EntityReference(id=pipeline_entity.id, type="pipeline"),
)
)
yield AddLineageRequest(
edge=EntitiesEdge(
toEntity=EntityReference(id=to_entity.id, type="table"),
fromEntity=EntityReference(id=pipeline_entity.id, type="pipeline"),
)
)
def next_record(self) -> Iterable[Entity]:
"""
Extract metadata information to create Pipelines with Tasks
"""
for workspace in self.client.list_workspaces():
for connection in self.client.list_connections(
workflow_id=workspace.get("workspaceId")
):
try:
if filter_by_pipeline(
self.source_config.pipelineFilterPattern,
connection.get("connectionId"),
):
continue
yield from self.fetch_pipeline(connection, workspace)
pipeline_fqn = fqn.build(
self.metadata,
entity_type=Pipeline,
service_name=self.service.name.__root__,
pipeline_name=connection.get("connectionId"),
)
yield from self.fetch_pipeline_status(connection, pipeline_fqn)
if self.source_config.includeLineage:
pipeline_entity: Pipeline = self.metadata.get_by_name(
entity=Pipeline,
fqn=pipeline_fqn,
)
yield from self.fetch_lineage(connection, pipeline_entity) or []
except Exception as err:
logger.error(repr(err))
logger.debug(traceback.format_exc())
self.status.failure(connection.get("connectionId"), repr(err))
def get_status(self):
return self.status
def close(self):
pass
def test_connection(self) -> None:
pass

View File

@ -0,0 +1,82 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Client to interact with airbyte apis
"""
import json
from typing import List
from metadata.generated.schema.entity.services.connections.pipeline.airbyteConnection import (
AirbyteConnection,
)
from metadata.ingestion.ometa.client import REST, APIError, ClientConfig
class AirbyteClient:
def __init__(self, config: AirbyteConnection):
self.config = config
client_config: ClientConfig = ClientConfig(
base_url=self.config.hostPort,
api_version="api/v1",
auth_header="Authorization",
auth_token=lambda: ("no_token", 0),
)
self.client = REST(client_config)
def list_workspaces(self) -> List[dict]:
"""
Method returns the list of workflows
an airbyte instance can contain multiple workflows
"""
response = self.client.post(f"/workspaces/list")
if response.get("exceptionStack"):
raise APIError(response["message"])
return response.get("workspaces")
def list_connections(self, workflow_id: str) -> List[dict]:
"""
Method returns the list all of connections of workflow
"""
data = {"workspaceId": workflow_id}
response = self.client.post(f"/connections/list", data=json.dumps(data))
if response.get("exceptionStack"):
raise APIError(response["message"])
return response.get("connections")
def list_jobs(self, connection_id: str) -> List[dict]:
"""
Method returns the list all of jobs of a connection
"""
data = {"configId": connection_id, "configTypes": ["sync", "reset_connection"]}
response = self.client.post(f"/jobs/list", data=json.dumps(data))
if response.get("exceptionStack"):
raise APIError(response["message"])
return response.get("jobs")
def get_source(self, source_id: str) -> dict:
"""
Method returns source details
"""
data = {"sourceId": source_id}
response = self.client.post(f"/sources/get", data=json.dumps(data))
if response.get("exceptionStack"):
raise APIError(response["message"])
return response
def get_destination(self, destination_id: str) -> dict:
"""
Method returns destination details
"""
data = {"destinationId": destination_id}
response = self.client.post(f"/destinations/get", data=json.dumps(data))
if response.get("exceptionStack"):
raise APIError(response["message"])
return response