MINOR: Fix databricks pipeline repeating tasks issue (#23851)

This commit is contained in:
Mayur Singal 2025-10-13 00:41:05 +05:30 committed by GitHub
parent c8722faf47
commit a638bdcfe0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 37 additions and 60 deletions

View File

@ -215,38 +215,25 @@ class DatabrickspipelineSource(PipelineServiceSource):
def get_tasks(self, pipeline_details: DataBrickPipelineDetails) -> List[Task]: def get_tasks(self, pipeline_details: DataBrickPipelineDetails) -> List[Task]:
try: try:
if not pipeline_details.job_id: if not pipeline_details.settings or not pipeline_details.settings.tasks:
return [] return None
task_list = [] job_url = f"https://{self.service_connection.hostPort}/#job/{pipeline_details.job_id}"
for run in self.client.get_job_runs(job_id=pipeline_details.job_id) or []:
run = DBRun(**run) return [
task_list.extend( Task(
[ name=str(task.name),
Task( taskType=pipeline_details.settings.task_type,
name=str(task.name), sourceUrl=SourceUrl(job_url),
taskType=( description=(
pipeline_details.settings.task_type Markdown(task.description) if task.description else None
if pipeline_details.settings ),
else None downstreamTasks=[
), depend_task.name for depend_task in task.depends_on or []
sourceUrl=( ],
SourceUrl(run.run_page_url)
if run.run_page_url
else None
),
description=(
Markdown(task.description) if task.description else None
),
downstreamTasks=[
depend_task.name
for depend_task in task.depends_on or []
],
)
for task in run.tasks or []
]
) )
return task_list for task in pipeline_details.settings.tasks
]
except Exception as exc: except Exception as exc:
logger.debug(traceback.format_exc()) logger.debug(traceback.format_exc())
logger.warning(f"Failed to get tasks list due to : {exc}") logger.warning(f"Failed to get tasks list due to : {exc}")

View File

@ -32,20 +32,14 @@ class PipelineTask(BaseModel):
full_refresh: Optional[bool] = None full_refresh: Optional[bool] = None
class DBJobTask(BaseModel):
name: Optional[str] = Field(None, alias="task_key")
description: Optional[str] = None
depends_on: Optional[List[DependentTask]] = None
pipeline_task: Optional[PipelineTask] = None
notebook_task: Optional[Dict[str, Any]] = None
spark_python_task: Optional[Dict[str, Any]] = None
class DBTasks(BaseModel): class DBTasks(BaseModel):
name: Optional[str] = Field(None, alias="task_key") name: Optional[str] = Field(None, alias="task_key")
description: Optional[str] = None description: Optional[str] = None
depends_on: Optional[List[DependentTask]] = None depends_on: Optional[List[DependentTask]] = None
run_page_url: Optional[str] = None run_page_url: Optional[str] = None
pipeline_task: Optional[PipelineTask] = None
notebook_task: Optional[Dict[str, Any]] = None
spark_python_task: Optional[Dict[str, Any]] = None
class DBSettings(BaseModel): class DBSettings(BaseModel):
@ -55,7 +49,7 @@ class DBSettings(BaseModel):
description: Optional[str] = None description: Optional[str] = None
schedule: Optional[DBRunSchedule] = None schedule: Optional[DBRunSchedule] = None
task_type: Optional[str] = Field(None, alias="format") task_type: Optional[str] = Field(None, alias="format")
tasks: Optional[List[DBJobTask]] = None tasks: Optional[List[DBTasks]] = None
class DataBrickPipelineDetails(BaseModel): class DataBrickPipelineDetails(BaseModel):

View File

@ -147,27 +147,27 @@ EXPECTED_CREATED_PIPELINES = CreatePipelineRequest(
displayName="OpenMetadata Databricks Workflow", displayName="OpenMetadata Databricks Workflow",
description="This job contain multiple tasks that are required to produce the weekly shark sightings report.", description="This job contain multiple tasks that are required to produce the weekly shark sightings report.",
tasks=[ tasks=[
Task(
name="Orders_Ingest",
description="Ingests order data",
sourceUrl="https://my-workspace.cloud.databricks.com/#job/11223344/run/123",
downstreamTasks=[],
taskType="SINGLE_TASK",
),
Task(
name="Match",
description="Matches orders with user sessions",
sourceUrl="https://my-workspace.cloud.databricks.com/#job/11223344/run/123",
downstreamTasks=["Orders_Ingested", "Sessionize"],
taskType="SINGLE_TASK",
),
Task( Task(
name="Sessionize", name="Sessionize",
description="Extracts session data from events", description="Extracts session data from events",
sourceUrl="https://my-workspace.cloud.databricks.com/#job/11223344/run/123", sourceUrl="https://localhost:443/#job/11223344",
downstreamTasks=[], downstreamTasks=[],
taskType="SINGLE_TASK", taskType="SINGLE_TASK",
), ),
Task(
name="Orders_Ingest",
description="Ingests order data",
sourceUrl="https://localhost:443/#job/11223344",
downstreamTasks=[],
taskType="SINGLE_TASK",
),
Task(
name="Matched_Changed",
description="Matches orders with user sessions",
sourceUrl="https://localhost:443/#job/11223344",
downstreamTasks=["Orders_Ingest", "Sessionize", "Sessionize_duplicated"],
taskType="SINGLE_TASK",
),
], ],
scheduleInterval="20 30 * * * ?", scheduleInterval="20 30 * * * ?",
service=FullyQualifiedEntityName(root="databricks_pipeline_test"), service=FullyQualifiedEntityName(root="databricks_pipeline_test"),
@ -279,11 +279,7 @@ class DatabricksPipelineTests(TestCase):
results = list(self.databricks.get_pipelines_list()) results = list(self.databricks.get_pipelines_list())
self.assertEqual(PIPELINE_LIST, results) self.assertEqual(PIPELINE_LIST, results)
@patch( def test_yield_pipeline(self):
"metadata.ingestion.source.database.databricks.client.DatabricksClient.get_job_runs"
)
def test_yield_pipeline(self, get_job_runs):
get_job_runs.return_value = mock_run_data
pipelines = list(self.databricks.yield_pipeline(PIPELINE_LIST[0]))[0].right pipelines = list(self.databricks.yield_pipeline(PIPELINE_LIST[0]))[0].right
self.assertEqual(pipelines, EXPECTED_CREATED_PIPELINES) self.assertEqual(pipelines, EXPECTED_CREATED_PIPELINES)