From 6d404ccdc1c58bac176baccec6cb3d5200677256 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Wed, 26 Oct 2022 21:22:26 +0200 Subject: [PATCH] Domo Pipeline - Use pipeline ID with display name (#8374) * Use pipeline ID with display name * handle id * lint * update test for ID --- .../ingestion/source/pipeline/domopipeline.py | 25 +++++++++++++------ .../topology/pipeline/test_domopipeline.py | 20 +++++++-------- 2 files changed, 28 insertions(+), 17 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/domopipeline.py b/ingestion/src/metadata/ingestion/source/pipeline/domopipeline.py index 24f65fdbb6c..8ff865a3af5 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/domopipeline.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/domopipeline.py @@ -78,15 +78,16 @@ class DomopipelineSource(PipelineServiceSource): def yield_pipeline(self, pipeline_details) -> Iterable[CreatePipelineRequest]: try: - pipeline_name = pipeline_details["name"] + pipeline_name = pipeline_details["id"] task = Task( name=pipeline_name, - displayName=pipeline_name, + displayName=pipeline_details.get("name"), description=pipeline_details.get("description", ""), ) pipeline_yield = CreatePipelineRequest( name=pipeline_name, + displayName=pipeline_details.get("name"), description=pipeline_details.get("description", ""), tasks=[task], service=EntityReference( @@ -118,9 +119,17 @@ class DomopipelineSource(PipelineServiceSource): return def yield_pipeline_status(self, pipeline_details) -> OMetaPipelineStatus: - pipeline_name = pipeline_details["name"] - runs = self.client.get_runs(pipeline_details["id"]) + + pipeline_id = pipeline_details.get("id") + if not pipeline_id: + logger.debug( + f"Could not extract ID from {pipeline_details} while getting status." + ) + return None + + runs = self.client.get_runs(pipeline_id) try: + for run in runs or []: start_time = run["beginTime"] // 1000 if run.get("beginTime") else None @@ -128,7 +137,7 @@ class DomopipelineSource(PipelineServiceSource): run_state = run.get("state", "Pending") task_status = TaskStatus( - name=pipeline_name, + name=pipeline_id, executionStatus=STATUS_MAP.get( run_state.lower(), StatusType.Pending.value ), @@ -149,9 +158,11 @@ class DomopipelineSource(PipelineServiceSource): pipeline_status=pipeline_status, ) except KeyError as err: - logger.error(f"Error extracting status data for {pipeline_name} - {err}") + logger.error(f"Error extracting status data for {pipeline_id} - {err}") logger.debug(traceback.format_exc()) except Exception as err: - logger.error(f"Wild error extracting status for {pipeline_name} - {err}") + logger.error(f"Wild error extracting status for {pipeline_id} - {err}") logger.debug(traceback.format_exc()) + + return None diff --git a/ingestion/tests/unit/topology/pipeline/test_domopipeline.py b/ingestion/tests/unit/topology/pipeline/test_domopipeline.py index a8ea8be522f..49b50559ac3 100644 --- a/ingestion/tests/unit/topology/pipeline/test_domopipeline.py +++ b/ingestion/tests/unit/topology/pipeline/test_domopipeline.py @@ -43,7 +43,7 @@ MOCK_PIPELINE_SERVICE = PipelineService( MOCK_PIPELINE = Pipeline( id="a58b1856-729c-493b-bc87-6d2269b43ec0", name="do_it_all_with_default_config", - fullyQualifiedName="local_domo_pipeline.Nihar Dataflows", + fullyQualifiedName="local_domo_pipeline.1", displayName="do_it_all_with_default_config", description="", tasks=[ @@ -97,13 +97,13 @@ mock_domopipeline_config = { EXPECTED_PIPELINE_STATUS = [ OMetaPipelineStatus( - pipeline_fqn="local_domo_pipeline.Nihar Dataflows", + pipeline_fqn="local_domo_pipeline.1", pipeline_status=PipelineStatus( timestamp=1665476792, executionStatus="Successful", taskStatus=[ TaskStatus( - name="Nihar Dataflows", + name="1", executionStatus="Successful", startTime=1665476783, endTime=1665476792, @@ -113,13 +113,13 @@ EXPECTED_PIPELINE_STATUS = [ ), ), OMetaPipelineStatus( - pipeline_fqn="local_domo_pipeline.Nihar Dataflows", + pipeline_fqn="local_domo_pipeline.1", pipeline_status=PipelineStatus( timestamp=1665470252, executionStatus="Successful", taskStatus=[ TaskStatus( - name="Nihar Dataflows", + name="1", executionStatus="Successful", startTime=1665470244, endTime=1665470252, @@ -129,13 +129,13 @@ EXPECTED_PIPELINE_STATUS = [ ), ), OMetaPipelineStatus( - pipeline_fqn="local_domo_pipeline.Nihar Dataflows", + pipeline_fqn="local_domo_pipeline.1", pipeline_status=PipelineStatus( timestamp=1665148827, executionStatus="Successful", taskStatus=[ TaskStatus( - name="Nihar Dataflows", + name="1", executionStatus="Successful", startTime=1665148818, endTime=1665148827, @@ -148,8 +148,8 @@ EXPECTED_PIPELINE_STATUS = [ EXPECTED_PIPELINE = [ CreatePipelineRequest( - name="Nihar Dataflows", - displayName=None, + name="1", + displayName="Nihar Dataflows", description="THis is description for Nihar dataflow", pipelineUrl=None, concurrency=None, @@ -157,7 +157,7 @@ EXPECTED_PIPELINE = [ startDate=datetime(2022, 10, 7, 13, 20, 16, tzinfo=timezone.utc), tasks=[ Task( - name="Nihar Dataflows", + name="1", displayName="Nihar Dataflows", fullyQualifiedName=None, description="THis is description for Nihar dataflow",