From 1112de22bc85d0f7f194e8d93cc65160e1261015 Mon Sep 17 00:00:00 2001 From: Suman Maharana Date: Mon, 20 Jan 2025 11:58:00 +0530 Subject: [PATCH] Fix Kafkaconnect validation errors (#19401) --- .../source/pipeline/kafkaconnect/metadata.py | 6 +++--- .../source/pipeline/kafkaconnect/models.py | 14 +++++++------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/metadata.py index 620f2416033..ce37ca3f285 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/metadata.py @@ -95,7 +95,7 @@ class KafkaconnectSource(PipelineServiceSource): sourceUrl=connection_url, tasks=[ Task( - name=task.id, + name=str(task.id), ) for task in pipeline_details.tasks or [] ], @@ -205,7 +205,7 @@ class KafkaconnectSource(PipelineServiceSource): metadata=self.metadata, entity_type=Topic, service_name=self.service_connection.messagingServiceName, - topic_name=topic.name, + topic_name=str(topic.name), ) topic_entity = self.metadata.get_by_name(entity=Topic, fqn=topic_fqn) @@ -279,7 +279,7 @@ class KafkaconnectSource(PipelineServiceSource): try: task_status = [ TaskStatus( - name=task.id, + name=str(task.id), executionStatus=STATUS_MAP.get(task.state, StatusType.Pending), ) for task in pipeline_details.tasks or [] diff --git a/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/models.py b/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/models.py index 55109d02d71..9de270f18a2 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/models.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/models.py @@ -27,7 +27,7 @@ class KafkaConnectTasks(BaseModel): default="UNASSIGNED", description="State of the task (e.g., RUNNING, STOPPED)" ) worker_id: Optional[str] = Field( - ..., description="ID of the worker running the task" + default=None, description="ID of the worker running the task" ) @@ -43,15 +43,15 @@ class KafkaConnectPipelineDetails(BaseModel): default="UNASSIGNED", description="State of the connector (e.g., RUNNING, STOPPED)", ) - tasks: Optional[List[KafkaConnectTasks]] - topics: Optional[List[KafkaConnectTopics]] - conn_type: Optional[str] = Field(..., alias="type") + tasks: Optional[List[KafkaConnectTasks]] = [] + topics: Optional[List[KafkaConnectTopics]] = [] + conn_type: Optional[str] = Field(default="UNKNOWN", alias="type") class KafkaConnectDatasetDetails(BaseModel): - table: Optional[str] - database: Optional[str] - container_name: Optional[str] + table: Optional[str] = None + database: Optional[str] = None + container_name: Optional[str] = None @property def dataset_type(self):