Fix Kafkaconnect validation errors (#19401)

This commit is contained in:
Suman Maharana 2025-01-20 11:58:00 +05:30 committed by GitHub
parent e5acdab3b8
commit 1112de22bc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 10 additions and 10 deletions

View File

@ -95,7 +95,7 @@ class KafkaconnectSource(PipelineServiceSource):
sourceUrl=connection_url, sourceUrl=connection_url,
tasks=[ tasks=[
Task( Task(
name=task.id, name=str(task.id),
) )
for task in pipeline_details.tasks or [] for task in pipeline_details.tasks or []
], ],
@ -205,7 +205,7 @@ class KafkaconnectSource(PipelineServiceSource):
metadata=self.metadata, metadata=self.metadata,
entity_type=Topic, entity_type=Topic,
service_name=self.service_connection.messagingServiceName, service_name=self.service_connection.messagingServiceName,
topic_name=topic.name, topic_name=str(topic.name),
) )
topic_entity = self.metadata.get_by_name(entity=Topic, fqn=topic_fqn) topic_entity = self.metadata.get_by_name(entity=Topic, fqn=topic_fqn)
@ -279,7 +279,7 @@ class KafkaconnectSource(PipelineServiceSource):
try: try:
task_status = [ task_status = [
TaskStatus( TaskStatus(
name=task.id, name=str(task.id),
executionStatus=STATUS_MAP.get(task.state, StatusType.Pending), executionStatus=STATUS_MAP.get(task.state, StatusType.Pending),
) )
for task in pipeline_details.tasks or [] for task in pipeline_details.tasks or []

View File

@ -27,7 +27,7 @@ class KafkaConnectTasks(BaseModel):
default="UNASSIGNED", description="State of the task (e.g., RUNNING, STOPPED)" default="UNASSIGNED", description="State of the task (e.g., RUNNING, STOPPED)"
) )
worker_id: Optional[str] = Field( worker_id: Optional[str] = Field(
..., description="ID of the worker running the task" default=None, description="ID of the worker running the task"
) )
@ -43,15 +43,15 @@ class KafkaConnectPipelineDetails(BaseModel):
default="UNASSIGNED", default="UNASSIGNED",
description="State of the connector (e.g., RUNNING, STOPPED)", description="State of the connector (e.g., RUNNING, STOPPED)",
) )
tasks: Optional[List[KafkaConnectTasks]] tasks: Optional[List[KafkaConnectTasks]] = []
topics: Optional[List[KafkaConnectTopics]] topics: Optional[List[KafkaConnectTopics]] = []
conn_type: Optional[str] = Field(..., alias="type") conn_type: Optional[str] = Field(default="UNKNOWN", alias="type")
class KafkaConnectDatasetDetails(BaseModel): class KafkaConnectDatasetDetails(BaseModel):
table: Optional[str] table: Optional[str] = None
database: Optional[str] database: Optional[str] = None
container_name: Optional[str] container_name: Optional[str] = None
@property @property
def dataset_type(self): def dataset_type(self):