diff --git a/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/metadata.py index f962dd3a1c3..0aaedf7d35d 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/metadata.py @@ -215,38 +215,25 @@ class DatabrickspipelineSource(PipelineServiceSource): def get_tasks(self, pipeline_details: DataBrickPipelineDetails) -> List[Task]: try: - if not pipeline_details.job_id: - return [] + if not pipeline_details.settings or not pipeline_details.settings.tasks: + return None - task_list = [] - for run in self.client.get_job_runs(job_id=pipeline_details.job_id) or []: - run = DBRun(**run) - task_list.extend( - [ - Task( - name=str(task.name), - taskType=( - pipeline_details.settings.task_type - if pipeline_details.settings - else None - ), - sourceUrl=( - SourceUrl(run.run_page_url) - if run.run_page_url - else None - ), - description=( - Markdown(task.description) if task.description else None - ), - downstreamTasks=[ - depend_task.name - for depend_task in task.depends_on or [] - ], - ) - for task in run.tasks or [] - ] + job_url = f"https://{self.service_connection.hostPort}/#job/{pipeline_details.job_id}" + + return [ + Task( + name=str(task.name), + taskType=pipeline_details.settings.task_type, + sourceUrl=SourceUrl(job_url), + description=( + Markdown(task.description) if task.description else None + ), + downstreamTasks=[ + depend_task.name for depend_task in task.depends_on or [] + ], ) - return task_list + for task in pipeline_details.settings.tasks + ] except Exception as exc: logger.debug(traceback.format_exc()) logger.warning(f"Failed to get tasks list due to : {exc}") diff --git a/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/models.py b/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/models.py index 2bccf7ab3f3..4486704f1cd 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/models.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/models.py @@ -32,20 +32,14 @@ class PipelineTask(BaseModel): full_refresh: Optional[bool] = None -class DBJobTask(BaseModel): - name: Optional[str] = Field(None, alias="task_key") - description: Optional[str] = None - depends_on: Optional[List[DependentTask]] = None - pipeline_task: Optional[PipelineTask] = None - notebook_task: Optional[Dict[str, Any]] = None - spark_python_task: Optional[Dict[str, Any]] = None - - class DBTasks(BaseModel): name: Optional[str] = Field(None, alias="task_key") description: Optional[str] = None depends_on: Optional[List[DependentTask]] = None run_page_url: Optional[str] = None + pipeline_task: Optional[PipelineTask] = None + notebook_task: Optional[Dict[str, Any]] = None + spark_python_task: Optional[Dict[str, Any]] = None class DBSettings(BaseModel): @@ -55,7 +49,7 @@ class DBSettings(BaseModel): description: Optional[str] = None schedule: Optional[DBRunSchedule] = None task_type: Optional[str] = Field(None, alias="format") - tasks: Optional[List[DBJobTask]] = None + tasks: Optional[List[DBTasks]] = None class DataBrickPipelineDetails(BaseModel): diff --git a/ingestion/tests/unit/topology/pipeline/test_databricks_pipeline.py b/ingestion/tests/unit/topology/pipeline/test_databricks_pipeline.py index 7333b846ec2..f23f85368fb 100644 --- a/ingestion/tests/unit/topology/pipeline/test_databricks_pipeline.py +++ b/ingestion/tests/unit/topology/pipeline/test_databricks_pipeline.py @@ -147,27 +147,27 @@ EXPECTED_CREATED_PIPELINES = CreatePipelineRequest( displayName="OpenMetadata Databricks Workflow", description="This job contain multiple tasks that are required to produce the weekly shark sightings report.", tasks=[ - Task( - name="Orders_Ingest", - description="Ingests order data", - sourceUrl="https://my-workspace.cloud.databricks.com/#job/11223344/run/123", - downstreamTasks=[], - taskType="SINGLE_TASK", - ), - Task( - name="Match", - description="Matches orders with user sessions", - sourceUrl="https://my-workspace.cloud.databricks.com/#job/11223344/run/123", - downstreamTasks=["Orders_Ingested", "Sessionize"], - taskType="SINGLE_TASK", - ), Task( name="Sessionize", description="Extracts session data from events", - sourceUrl="https://my-workspace.cloud.databricks.com/#job/11223344/run/123", + sourceUrl="https://localhost:443/#job/11223344", downstreamTasks=[], taskType="SINGLE_TASK", ), + Task( + name="Orders_Ingest", + description="Ingests order data", + sourceUrl="https://localhost:443/#job/11223344", + downstreamTasks=[], + taskType="SINGLE_TASK", + ), + Task( + name="Matched_Changed", + description="Matches orders with user sessions", + sourceUrl="https://localhost:443/#job/11223344", + downstreamTasks=["Orders_Ingest", "Sessionize", "Sessionize_duplicated"], + taskType="SINGLE_TASK", + ), ], scheduleInterval="20 30 * * * ?", service=FullyQualifiedEntityName(root="databricks_pipeline_test"), @@ -279,11 +279,7 @@ class DatabricksPipelineTests(TestCase): results = list(self.databricks.get_pipelines_list()) self.assertEqual(PIPELINE_LIST, results) - @patch( - "metadata.ingestion.source.database.databricks.client.DatabricksClient.get_job_runs" - ) - def test_yield_pipeline(self, get_job_runs): - get_job_runs.return_value = mock_run_data + def test_yield_pipeline(self): pipelines = list(self.databricks.yield_pipeline(PIPELINE_LIST[0]))[0].right self.assertEqual(pipelines, EXPECTED_CREATED_PIPELINES)