mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2026-01-06 12:36:56 +00:00
Added KafkaConnect Connector (#16217)
This commit is contained in:
parent
ad29964e81
commit
8dc623e280
@ -215,6 +215,7 @@ plugins: Dict[str, Set[str]] = {
|
||||
"thrift-sasl~=0.4",
|
||||
},
|
||||
"kafka": {*COMMONS["kafka"]},
|
||||
"kafkaconnect": {"kafka-connect-py==0.10.11"},
|
||||
"kinesis": {VERSIONS["boto3"]},
|
||||
"looker": {
|
||||
VERSIONS["looker-sdk"],
|
||||
|
||||
30
ingestion/src/metadata/examples/workflows/kafka_connect.yaml
Normal file
30
ingestion/src/metadata/examples/workflows/kafka_connect.yaml
Normal file
@ -0,0 +1,30 @@
|
||||
source:
|
||||
type: kafkaconnect
|
||||
serviceName: kafka_connect_source
|
||||
serviceConnection:
|
||||
config:
|
||||
type: KafkaConnect
|
||||
# For KafkaConnect, choose one of noAuth or basic
|
||||
# # For basic authentication
|
||||
# authType:
|
||||
# username: username
|
||||
# password: password
|
||||
hostPort: http://localhost:8083
|
||||
verifySSL: true
|
||||
messagingServiceName: local_kafka_source
|
||||
sourceConfig:
|
||||
config:
|
||||
type: PipelineMetadata
|
||||
lineageInformation:
|
||||
dbServiceNames: []
|
||||
storageServiceNames: []
|
||||
sink:
|
||||
type: metadata-rest
|
||||
config: {}
|
||||
workflowConfig:
|
||||
loggerLevel: INFO # DEBUG, INFO, WARN or ERROR
|
||||
openMetadataServerConfig:
|
||||
hostPort: http://localhost:8585/api
|
||||
authProvider: openmetadata
|
||||
securityConfig:
|
||||
jwtToken: eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg
|
||||
@ -0,0 +1,278 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
Client to interact with Kafka Connect REST APIs
|
||||
"""
|
||||
|
||||
import traceback
|
||||
from typing import List, Optional
|
||||
|
||||
from kafka_connect import KafkaConnect
|
||||
|
||||
from metadata.generated.schema.entity.services.connections.pipeline.kafkaConnectConnection import (
|
||||
KafkaConnectConnection,
|
||||
)
|
||||
from metadata.ingestion.source.pipeline.kafkaconnect.models import (
|
||||
KafkaConnectDatasetDetails,
|
||||
KafkaConnectPipelineDetails,
|
||||
KafkaConnectTasks,
|
||||
KafkaConnectTopics,
|
||||
)
|
||||
from metadata.utils.helpers import clean_uri
|
||||
from metadata.utils.logger import ometa_logger
|
||||
|
||||
logger = ometa_logger()
|
||||
|
||||
SUPPORTED_DATASETS = {
|
||||
"table": [
|
||||
"table",
|
||||
"collection",
|
||||
"snowflake.schema.name",
|
||||
"table.whitelist",
|
||||
"fields.whitelist",
|
||||
],
|
||||
"database": ["database", "db.name", "snowflake.database.name"],
|
||||
"container_name": ["s3.bucket.name"],
|
||||
}
|
||||
|
||||
|
||||
class KafkaConnectClient:
|
||||
"""
|
||||
Wrapper on top of KafkaConnect REST API
|
||||
"""
|
||||
|
||||
def __init__(self, config: KafkaConnectConnection):
|
||||
url = clean_uri(config.hostPort)
|
||||
auth = None
|
||||
ssl_verify = config.verifySSL
|
||||
if config.KafkaConnectConfig:
|
||||
auth = f"{config.KafkaConnectConfig.username}:{config.KafkaConnectConfig.password}"
|
||||
self.client = KafkaConnect(url=url, auth=auth, ssl_verify=ssl_verify)
|
||||
|
||||
def get_cluster_info(self) -> Optional[dict]:
|
||||
"""
|
||||
Get the version and other details of the Kafka Connect cluster.
|
||||
"""
|
||||
try:
|
||||
result = self.client.get_cluster_info()
|
||||
return result
|
||||
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(f"Unable to get cluster info :{exc}")
|
||||
|
||||
return None
|
||||
|
||||
def get_connectors(
|
||||
self,
|
||||
expand: str = None,
|
||||
pattern: str = None,
|
||||
state: str = None,
|
||||
) -> Optional[dict]:
|
||||
"""
|
||||
Get the list of connectors.
|
||||
Args:
|
||||
expand (str): Optional parameter that retrieves additional information about the connectors.
|
||||
Valid values are "status" and "info".
|
||||
pattern (str): Only list connectors that match the regex pattern.
|
||||
state (str): Only list connectors that match the state.
|
||||
"""
|
||||
|
||||
try:
|
||||
result = self.client.list_connectors(
|
||||
expand=expand, pattern=pattern, state=state
|
||||
)
|
||||
return result
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(f"Unable to get connectors list {exc}")
|
||||
|
||||
return None
|
||||
|
||||
def get_connector_plugins(self) -> Optional[dict]:
|
||||
"""
|
||||
Get the list of connector plugins.
|
||||
"""
|
||||
try:
|
||||
result = self.client.list_connector_plugins()
|
||||
return result
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.warning(f"Unable to get connector plugins {exc}")
|
||||
|
||||
return None
|
||||
|
||||
def get_connector_config(self, connector: str) -> Optional[dict]:
|
||||
"""
|
||||
Get the details of a single connector.
|
||||
Args:
|
||||
connector (str): The name of the connector.
|
||||
"""
|
||||
try:
|
||||
result = self.client.get_connector(connector=connector)
|
||||
if result:
|
||||
return result.get("config")
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(f"Unable to get connector configuration details {exc}")
|
||||
|
||||
return None
|
||||
|
||||
def get_connector_dataset_info(self, connector: str) -> Optional[dict]:
|
||||
"""
|
||||
Get the details of dataset of connector if there is any.
|
||||
Checks in the connector configurations for dataset fields
|
||||
if any related field is found returns the result
|
||||
Args:
|
||||
connector (str): The name of the connector.
|
||||
Returns:
|
||||
Optional[Dict]: A dictionary containing dataset information
|
||||
(type, table, database, or bucket_name)
|
||||
if a dataset is found, or None if the connector
|
||||
is not found, has no dataset, or an error occurs.
|
||||
"""
|
||||
try:
|
||||
conn_config = self.get_connector_config(connector=connector)
|
||||
|
||||
if not conn_config:
|
||||
return None
|
||||
|
||||
result = {}
|
||||
for dataset in SUPPORTED_DATASETS or []:
|
||||
for key in SUPPORTED_DATASETS[dataset] or []:
|
||||
if conn_config.get(key):
|
||||
result[dataset] = conn_config[key]
|
||||
return KafkaConnectDatasetDetails(**result)
|
||||
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.warning(f"Unable to get connector dataset details {exc}")
|
||||
|
||||
return None
|
||||
|
||||
def get_connector_tasks(self, connector: str) -> Optional[List[KafkaConnectTasks]]:
|
||||
"""
|
||||
Get the list of tasks for a connector.
|
||||
Args:
|
||||
connector (str): The name of the connector.
|
||||
Returns:
|
||||
Optional[List[KafkaConnectTasks]]: A list of KafkaConnectTasks objects
|
||||
representing the connector's tasks,
|
||||
or None if the connector is not found
|
||||
or an error occurs.
|
||||
"""
|
||||
try:
|
||||
result = self.client.get_connector_status(connector=connector)
|
||||
tasks = [KafkaConnectTasks(**task) for task in result.get("tasks") or []]
|
||||
return tasks
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(f"Unable to get connector tasks list {exc}")
|
||||
|
||||
return None
|
||||
|
||||
def get_connector_task_status(self, connector: str, task_id: int) -> Optional[dict]:
|
||||
"""
|
||||
Get the status of a specific task for a connector.
|
||||
Args:
|
||||
connector (str): The name of the connector.
|
||||
task_id (int): The ID of the task.
|
||||
Returns:
|
||||
Optional[Dict]: A dictionary containing the task status information,
|
||||
or None if the connector or task is not found
|
||||
or an error occurs.
|
||||
"""
|
||||
try:
|
||||
result = self.client.get_connector_task_status(
|
||||
connector=connector, task_id=task_id
|
||||
)
|
||||
return result
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(f"Unable to get connector tasks status {exc}")
|
||||
|
||||
return None
|
||||
|
||||
def get_connector_topics(
|
||||
self, connector: str
|
||||
) -> Optional[List[KafkaConnectTopics]]:
|
||||
"""
|
||||
Get the list of topics for a connector.
|
||||
|
||||
Args:
|
||||
connector (str): The name of the connector.
|
||||
|
||||
Returns:
|
||||
Optional[List[KafkaConnectTopics]]: A list of KafkaConnectTopics objects
|
||||
representing the connector's topics,
|
||||
or None if the connector is not found
|
||||
or an error occurs.
|
||||
"""
|
||||
try:
|
||||
result = self.client.list_connector_topics(connector=connector).get(
|
||||
connector
|
||||
)
|
||||
if result:
|
||||
topics = [
|
||||
KafkaConnectTopics(name=topic)
|
||||
for topic in result.get("topics") or []
|
||||
]
|
||||
return topics
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.warning(f"Unable to get connector Topics {exc}")
|
||||
|
||||
return None
|
||||
|
||||
def get_connector_state(self, connector: str) -> Optional[str]:
|
||||
"""
|
||||
Get the status of a single connector.
|
||||
Args:
|
||||
connector (str): The name of the connector.
|
||||
"""
|
||||
try:
|
||||
result = self.client.get_connector_status(connector=connector)
|
||||
if result.get("connector"):
|
||||
return result["connector"].get("state")
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.warning(f"Unable to get connector state due to {exc}")
|
||||
|
||||
return None
|
||||
|
||||
def get_connector_list(self) -> Optional[List[KafkaConnectPipelineDetails]]:
|
||||
"""
|
||||
Get the information of a single connector.
|
||||
Args:
|
||||
connector (str): The name of the connector.
|
||||
Returns:
|
||||
Optional[KafkaConnectPipelineDetails]: A KafkaConnectPipelineDetails
|
||||
object containing connector information,
|
||||
or None if the connector is not found
|
||||
or an error occurs.
|
||||
"""
|
||||
try:
|
||||
connectors = []
|
||||
for connector in self.get_connectors() or []:
|
||||
result = self.client.get_connector_status(connector=connector)
|
||||
connector_details = KafkaConnectPipelineDetails(**result)
|
||||
connector_details.status = self.get_connector_state(connector=connector)
|
||||
connector_details.tasks = self.get_connector_tasks(connector=connector)
|
||||
connector_details.topics = self.get_connector_topics(
|
||||
connector=connector
|
||||
)
|
||||
connectors.append(connector_details)
|
||||
return connectors
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(f"Unable to get connector information {exc}")
|
||||
|
||||
return None
|
||||
@ -0,0 +1,58 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Source connection handler
|
||||
"""
|
||||
|
||||
from typing import Optional
|
||||
|
||||
from metadata.generated.schema.entity.automations.workflow import (
|
||||
Workflow as AutomationWorkflow,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.connections.pipeline.kafkaConnectConnection import (
|
||||
KafkaConnectConnection,
|
||||
)
|
||||
from metadata.ingestion.connections.test_connections import test_connection_steps
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
from metadata.ingestion.source.pipeline.kafkaconnect.client import KafkaConnectClient
|
||||
|
||||
|
||||
def get_connection(connection: KafkaConnectConnection) -> KafkaConnectClient:
|
||||
"""
|
||||
Create connection
|
||||
"""
|
||||
return KafkaConnectClient(connection)
|
||||
|
||||
|
||||
def test_connection(
|
||||
metadata: OpenMetadata,
|
||||
client: KafkaConnectClient,
|
||||
service_connection: KafkaConnectConnection,
|
||||
automation_workflow: Optional[AutomationWorkflow] = None,
|
||||
) -> None:
|
||||
"""
|
||||
Test connection. This can be executed either as part
|
||||
of a metadata workflow or during an Automation Workflow
|
||||
"""
|
||||
|
||||
test_fn = {
|
||||
"GetClusterInfo": client.get_cluster_info,
|
||||
"GetPipelines": client.get_connectors,
|
||||
"GetPlugins": client.get_connector_plugins,
|
||||
}
|
||||
|
||||
test_connection_steps(
|
||||
metadata=metadata,
|
||||
test_fn=test_fn,
|
||||
service_type=service_connection.type.value,
|
||||
automation_workflow=automation_workflow,
|
||||
)
|
||||
@ -0,0 +1,319 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
KafkaConnect source to extract metadata from OM UI
|
||||
"""
|
||||
import traceback
|
||||
from datetime import datetime
|
||||
from typing import Iterable, Optional
|
||||
|
||||
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
|
||||
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
|
||||
from metadata.generated.schema.entity.data.container import Container
|
||||
from metadata.generated.schema.entity.data.pipeline import (
|
||||
Pipeline,
|
||||
PipelineStatus,
|
||||
StatusType,
|
||||
Task,
|
||||
TaskStatus,
|
||||
)
|
||||
from metadata.generated.schema.entity.data.table import Table
|
||||
from metadata.generated.schema.entity.data.topic import Topic
|
||||
from metadata.generated.schema.entity.services.connections.pipeline.kafkaConnectConnection import (
|
||||
KafkaConnectConnection,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
|
||||
StackTraceError,
|
||||
)
|
||||
from metadata.generated.schema.metadataIngestion.workflow import (
|
||||
Source as WorkflowSource,
|
||||
)
|
||||
from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails
|
||||
from metadata.generated.schema.type.entityLineage import Source as LineageSource
|
||||
from metadata.generated.schema.type.entityReference import EntityReference
|
||||
from metadata.ingestion.api.models import Either
|
||||
from metadata.ingestion.api.steps import InvalidSourceException
|
||||
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata, T
|
||||
from metadata.ingestion.source.pipeline.kafkaconnect.models import (
|
||||
KafkaConnectPipelineDetails,
|
||||
)
|
||||
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
|
||||
from metadata.utils import fqn
|
||||
from metadata.utils.constants import ENTITY_REFERENCE_TYPE_MAP
|
||||
from metadata.utils.helpers import clean_uri, datetime_to_ts
|
||||
from metadata.utils.logger import ingestion_logger
|
||||
|
||||
logger = ingestion_logger()
|
||||
|
||||
STATUS_MAP = {
|
||||
"RUNNING": StatusType.Successful.value,
|
||||
"FAILED": StatusType.Failed.value,
|
||||
"PAUSED": StatusType.Pending.value,
|
||||
"UNASSIGNED": StatusType.Pending.value,
|
||||
}
|
||||
|
||||
|
||||
class KafkaconnectSource(PipelineServiceSource):
|
||||
"""
|
||||
Implements the necessary methods ot extract
|
||||
Pipeline metadata from Kafka Connect
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
def create(
|
||||
cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str] = None
|
||||
):
|
||||
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
|
||||
connection: KafkaConnectConnection = config.serviceConnection.__root__.config
|
||||
if not isinstance(connection, KafkaConnectConnection):
|
||||
raise InvalidSourceException(
|
||||
f"Expected KafkaConnectConnection, but got {connection}"
|
||||
)
|
||||
return cls(config, metadata)
|
||||
|
||||
def yield_pipeline(
|
||||
self, pipeline_details: KafkaConnectPipelineDetails
|
||||
) -> Iterable[Either[CreatePipelineRequest]]:
|
||||
"""
|
||||
Method to Get Pipeline Entity
|
||||
"""
|
||||
try:
|
||||
connection_url = f"{clean_uri(self.service_connection.hostPort)}"
|
||||
|
||||
pipeline_request = CreatePipelineRequest(
|
||||
name=pipeline_details.name,
|
||||
sourceUrl=connection_url,
|
||||
tasks=[
|
||||
Task(
|
||||
name=task.id,
|
||||
)
|
||||
for task in pipeline_details.tasks or []
|
||||
],
|
||||
service=self.context.get().pipeline_service,
|
||||
)
|
||||
yield Either(right=pipeline_request)
|
||||
self.register_record(pipeline_request=pipeline_request)
|
||||
except Exception as exc:
|
||||
yield Either(
|
||||
left=StackTraceError(
|
||||
name=pipeline_details.name,
|
||||
error=f"Wild error ingesting pipeline {pipeline_details} - {exc}",
|
||||
stackTrace=traceback.format_exc(),
|
||||
)
|
||||
)
|
||||
|
||||
def get_dataset_entity(
|
||||
self, pipeline_details: KafkaConnectPipelineDetails
|
||||
) -> Optional[T]:
|
||||
"""
|
||||
Get lineage dataset entity
|
||||
"""
|
||||
try:
|
||||
dataset_details = self.client.get_connector_dataset_info(
|
||||
connector=pipeline_details.name
|
||||
)
|
||||
if dataset_details:
|
||||
if (
|
||||
isinstance(dataset_details.dataset_type, type(Table))
|
||||
and self.source_config.lineageInformation.dbServiceNames
|
||||
):
|
||||
for dbservicename in (
|
||||
self.source_config.lineageInformation.dbServiceNames or []
|
||||
):
|
||||
dataset_entity = self.metadata.get_by_name(
|
||||
entity=dataset_details.dataset_type,
|
||||
fqn=fqn.build(
|
||||
metadata=self.metadata,
|
||||
entity_type=dataset_details.dataset_type,
|
||||
table_name=dataset_details.table,
|
||||
database_name=None,
|
||||
schema_name=dataset_details.database,
|
||||
service_name=dbservicename,
|
||||
),
|
||||
)
|
||||
|
||||
if dataset_entity:
|
||||
return dataset_entity
|
||||
|
||||
if (
|
||||
isinstance(dataset_details.dataset_type, type(Container))
|
||||
and self.source_config.lineageInformation.storageServiceNames
|
||||
):
|
||||
for storageservicename in (
|
||||
self.source_config.lineageInformation.storageServiceNames or []
|
||||
):
|
||||
dataset_entity = self.metadata.get_by_name(
|
||||
entity=dataset_details.dataset_type,
|
||||
fqn=fqn.build(
|
||||
metadata=self.metadata,
|
||||
entity_type=dataset_details.dataset_type,
|
||||
container_name=dataset_details.container_name,
|
||||
service_name=storageservicename,
|
||||
),
|
||||
)
|
||||
|
||||
if dataset_entity:
|
||||
return dataset_entity
|
||||
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.warning(f"Unable to get dataset entity {exc}")
|
||||
|
||||
return None
|
||||
|
||||
def yield_pipeline_lineage_details(
|
||||
self, pipeline_details: KafkaConnectPipelineDetails
|
||||
) -> Iterable[Either[AddLineageRequest]]:
|
||||
"""
|
||||
Get lineage between pipeline and data sources
|
||||
"""
|
||||
try:
|
||||
if not self.service_connection.messagingServiceName:
|
||||
logger.debug("Kafka messagingServiceName not found")
|
||||
return None
|
||||
|
||||
pipeline_fqn = fqn.build(
|
||||
metadata=self.metadata,
|
||||
entity_type=Pipeline,
|
||||
service_name=self.context.get().pipeline_service,
|
||||
pipeline_name=self.context.get().pipeline,
|
||||
)
|
||||
|
||||
pipeline_entity = self.metadata.get_by_name(
|
||||
entity=Pipeline, fqn=pipeline_fqn
|
||||
)
|
||||
|
||||
lineage_details = LineageDetails(
|
||||
pipeline=EntityReference(
|
||||
id=pipeline_entity.id.__root__, type="pipeline"
|
||||
),
|
||||
source=LineageSource.PipelineLineage,
|
||||
)
|
||||
|
||||
dataset_entity = self.get_dataset_entity(pipeline_details=pipeline_details)
|
||||
|
||||
for topic in pipeline_details.topics or []:
|
||||
|
||||
topic_fqn = fqn.build(
|
||||
metadata=self.metadata,
|
||||
entity_type=Topic,
|
||||
service_name=self.service_connection.messagingServiceName,
|
||||
topic_name=topic.name,
|
||||
)
|
||||
|
||||
topic_entity = self.metadata.get_by_name(entity=Topic, fqn=topic_fqn)
|
||||
|
||||
if topic_entity is None or dataset_entity is None:
|
||||
continue
|
||||
|
||||
if pipeline_details.conn_type.lower() == "sink":
|
||||
from_entity, to_entity = topic_entity, dataset_entity
|
||||
else:
|
||||
from_entity, to_entity = dataset_entity, topic_entity
|
||||
|
||||
yield Either(
|
||||
right=AddLineageRequest(
|
||||
edge=EntitiesEdge(
|
||||
fromEntity=EntityReference(
|
||||
id=from_entity.id,
|
||||
type=ENTITY_REFERENCE_TYPE_MAP[
|
||||
type(from_entity).__name__
|
||||
],
|
||||
),
|
||||
toEntity=EntityReference(
|
||||
id=to_entity.id,
|
||||
type=ENTITY_REFERENCE_TYPE_MAP[
|
||||
type(to_entity).__name__
|
||||
],
|
||||
),
|
||||
lineageDetails=lineage_details,
|
||||
)
|
||||
)
|
||||
)
|
||||
except Exception as exc:
|
||||
yield Either(
|
||||
left=StackTraceError(
|
||||
name=pipeline_details.name,
|
||||
error=f"Wild error ingesting pipeline lineage {pipeline_details} - {exc}",
|
||||
stackTrace=traceback.format_exc(),
|
||||
)
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
def get_pipelines_list(self) -> Iterable[KafkaConnectPipelineDetails]:
|
||||
"""
|
||||
Get List of all pipelines
|
||||
"""
|
||||
try:
|
||||
yield from self.client.get_connector_list()
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(f"Failed to get pipeline list due to : {exc}")
|
||||
|
||||
def get_pipeline_name(self, pipeline_details: KafkaConnectPipelineDetails) -> str:
|
||||
"""
|
||||
Get Pipeline Name
|
||||
"""
|
||||
try:
|
||||
return pipeline_details.name
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(f"Failed to get pipeline name to : {exc}")
|
||||
|
||||
return None
|
||||
|
||||
def yield_pipeline_status(
|
||||
self, pipeline_details: KafkaConnectPipelineDetails
|
||||
) -> Iterable[Either[OMetaPipelineStatus]]:
|
||||
"""
|
||||
Get Pipeline Status
|
||||
"""
|
||||
try:
|
||||
task_status = [
|
||||
TaskStatus(
|
||||
name=task.id,
|
||||
executionStatus=STATUS_MAP.get(task.state, StatusType.Pending),
|
||||
)
|
||||
for task in pipeline_details.tasks or []
|
||||
]
|
||||
|
||||
pipeline_status = PipelineStatus(
|
||||
executionStatus=STATUS_MAP.get(
|
||||
pipeline_details.status, StatusType.Pending
|
||||
),
|
||||
taskStatus=task_status,
|
||||
timestamp=datetime_to_ts(datetime.now())
|
||||
# Kafka connect doesn't provide any details with exec time
|
||||
)
|
||||
|
||||
pipeline_fqn = fqn.build(
|
||||
metadata=self.metadata,
|
||||
entity_type=Pipeline,
|
||||
service_name=self.context.get().pipeline_service,
|
||||
pipeline_name=self.context.get().pipeline,
|
||||
)
|
||||
yield Either(
|
||||
right=OMetaPipelineStatus(
|
||||
pipeline_fqn=pipeline_fqn,
|
||||
pipeline_status=pipeline_status,
|
||||
)
|
||||
)
|
||||
|
||||
except Exception as exc:
|
||||
yield Either(
|
||||
left=StackTraceError(
|
||||
name=pipeline_details.name,
|
||||
error=f"Wild error ingesting pipeline status {pipeline_details} - {exc}",
|
||||
stackTrace=traceback.format_exc(),
|
||||
)
|
||||
)
|
||||
@ -0,0 +1,62 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
KafkaConnect Source Model module
|
||||
"""
|
||||
|
||||
from typing import List, Optional
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from metadata.generated.schema.entity.data.container import Container
|
||||
from metadata.generated.schema.entity.data.table import Table
|
||||
|
||||
|
||||
class KafkaConnectTasks(BaseModel):
|
||||
id: int = Field(..., description="ID of the task")
|
||||
state: Optional[str] = Field(
|
||||
default="UNASSIGNED", description="State of the task (e.g., RUNNING, STOPPED)"
|
||||
)
|
||||
worker_id: Optional[str] = Field(
|
||||
..., description="ID of the worker running the task"
|
||||
)
|
||||
|
||||
|
||||
class KafkaConnectTopics(BaseModel):
|
||||
name: str = Field(..., description="Name of the topic (e.g., random-source-avro)")
|
||||
|
||||
|
||||
class KafkaConnectPipelineDetails(BaseModel):
|
||||
name: str = Field(
|
||||
..., description="Name of the status source (e.g., random-source-json)"
|
||||
)
|
||||
status: Optional[str] = Field(
|
||||
default="UNASSIGNED",
|
||||
description="State of the connector (e.g., RUNNING, STOPPED)",
|
||||
)
|
||||
tasks: Optional[List[KafkaConnectTasks]]
|
||||
topics: Optional[List[KafkaConnectTopics]]
|
||||
conn_type: Optional[str] = Field(..., alias="type")
|
||||
|
||||
|
||||
class KafkaConnectDatasetDetails(BaseModel):
|
||||
table: Optional[str]
|
||||
database: Optional[str]
|
||||
container_name: Optional[str]
|
||||
|
||||
@property
|
||||
def dataset_type(self):
|
||||
if self.table or self.database:
|
||||
return Table
|
||||
if self.container_name:
|
||||
return Container
|
||||
return None
|
||||
@ -707,6 +707,10 @@ site_menu:
|
||||
url: /connectors/pipeline/dagster
|
||||
- category: Connectors / Pipeline / Dagster / Run Externally
|
||||
url: /connectors/pipeline/dagster/yaml
|
||||
- category: Connectors / Pipeline / KafkaConnect
|
||||
url: /connectors/pipeline/kafkaconnect
|
||||
- category: Connectors / Pipeline / KafkaConnect / Run Externally
|
||||
url: /connectors/pipeline/kafkaconnect/yaml
|
||||
- category: Connectors / Pipeline / Databricks Pipeline
|
||||
url: /connectors/pipeline/databricks-pipeline
|
||||
- category: Connectors / Pipeline / Databricks Pipeline / Run Externally
|
||||
|
||||
@ -22,6 +22,7 @@ import databricksPipelineConnection from '../jsons/connectionSchemas/connections
|
||||
import domoPipelineConnection from '../jsons/connectionSchemas/connections/pipeline/domoPipelineConnection.json';
|
||||
import fivetranConnection from '../jsons/connectionSchemas/connections/pipeline/fivetranConnection.json';
|
||||
import gluePipelineConnection from '../jsons/connectionSchemas/connections/pipeline/gluePipelineConnection.json';
|
||||
import KafkaConnectConnection from '../jsons/connectionSchemas/connections/pipeline/kafkaConnectConnection.json';
|
||||
import nifiConnection from '../jsons/connectionSchemas/connections/pipeline/nifiConnection.json';
|
||||
import openLineageConnection from '../jsons/connectionSchemas/connections/pipeline/openLineageConnection.json';
|
||||
import splineConnection from '../jsons/connectionSchemas/connections/pipeline/splineConnection.json';
|
||||
@ -46,6 +47,11 @@ export const getPipelineConfig = (type: PipelineServiceType) => {
|
||||
|
||||
break;
|
||||
}
|
||||
case PipelineServiceType.KafkaConnect: {
|
||||
schema = KafkaConnectConnection;
|
||||
|
||||
break;
|
||||
}
|
||||
case PipelineServiceType.Fivetran: {
|
||||
schema = fivetranConnection;
|
||||
|
||||
|
||||
@ -117,7 +117,6 @@ class ServiceUtilClassBase {
|
||||
DatabaseServiceType.Dbt,
|
||||
StorageServiceType.Gcs,
|
||||
MetadataServiceType.Alation,
|
||||
PipelineServiceType.KafkaConnect,
|
||||
];
|
||||
|
||||
protected updateUnsupportedServices(types: string[]) {
|
||||
@ -334,6 +333,9 @@ class ServiceUtilClassBase {
|
||||
case PipelineServiceType.GluePipeline:
|
||||
return GLUE;
|
||||
|
||||
case PipelineServiceType.KafkaConnect:
|
||||
return KAFKA;
|
||||
|
||||
case PipelineServiceType.Spark:
|
||||
return SPARK;
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user