Domo Pipeline - Use pipeline ID with display name (#8374)

* Use pipeline ID with display name

* handle id

* lint

* update test for ID
This commit is contained in:
Pere Miquel Brull 2022-10-26 21:22:26 +02:00 committed by GitHub
parent cd3a2abff0
commit 6d404ccdc1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 28 additions and 17 deletions

View File

@ -78,15 +78,16 @@ class DomopipelineSource(PipelineServiceSource):
def yield_pipeline(self, pipeline_details) -> Iterable[CreatePipelineRequest]:
try:
pipeline_name = pipeline_details["name"]
pipeline_name = pipeline_details["id"]
task = Task(
name=pipeline_name,
displayName=pipeline_name,
displayName=pipeline_details.get("name"),
description=pipeline_details.get("description", ""),
)
pipeline_yield = CreatePipelineRequest(
name=pipeline_name,
displayName=pipeline_details.get("name"),
description=pipeline_details.get("description", ""),
tasks=[task],
service=EntityReference(
@ -118,9 +119,17 @@ class DomopipelineSource(PipelineServiceSource):
return
def yield_pipeline_status(self, pipeline_details) -> OMetaPipelineStatus:
pipeline_name = pipeline_details["name"]
runs = self.client.get_runs(pipeline_details["id"])
pipeline_id = pipeline_details.get("id")
if not pipeline_id:
logger.debug(
f"Could not extract ID from {pipeline_details} while getting status."
)
return None
runs = self.client.get_runs(pipeline_id)
try:
for run in runs or []:
start_time = run["beginTime"] // 1000 if run.get("beginTime") else None
@ -128,7 +137,7 @@ class DomopipelineSource(PipelineServiceSource):
run_state = run.get("state", "Pending")
task_status = TaskStatus(
name=pipeline_name,
name=pipeline_id,
executionStatus=STATUS_MAP.get(
run_state.lower(), StatusType.Pending.value
),
@ -149,9 +158,11 @@ class DomopipelineSource(PipelineServiceSource):
pipeline_status=pipeline_status,
)
except KeyError as err:
logger.error(f"Error extracting status data for {pipeline_name} - {err}")
logger.error(f"Error extracting status data for {pipeline_id} - {err}")
logger.debug(traceback.format_exc())
except Exception as err:
logger.error(f"Wild error extracting status for {pipeline_name} - {err}")
logger.error(f"Wild error extracting status for {pipeline_id} - {err}")
logger.debug(traceback.format_exc())
return None

View File

@ -43,7 +43,7 @@ MOCK_PIPELINE_SERVICE = PipelineService(
MOCK_PIPELINE = Pipeline(
id="a58b1856-729c-493b-bc87-6d2269b43ec0",
name="do_it_all_with_default_config",
fullyQualifiedName="local_domo_pipeline.Nihar Dataflows",
fullyQualifiedName="local_domo_pipeline.1",
displayName="do_it_all_with_default_config",
description="",
tasks=[
@ -97,13 +97,13 @@ mock_domopipeline_config = {
EXPECTED_PIPELINE_STATUS = [
OMetaPipelineStatus(
pipeline_fqn="local_domo_pipeline.Nihar Dataflows",
pipeline_fqn="local_domo_pipeline.1",
pipeline_status=PipelineStatus(
timestamp=1665476792,
executionStatus="Successful",
taskStatus=[
TaskStatus(
name="Nihar Dataflows",
name="1",
executionStatus="Successful",
startTime=1665476783,
endTime=1665476792,
@ -113,13 +113,13 @@ EXPECTED_PIPELINE_STATUS = [
),
),
OMetaPipelineStatus(
pipeline_fqn="local_domo_pipeline.Nihar Dataflows",
pipeline_fqn="local_domo_pipeline.1",
pipeline_status=PipelineStatus(
timestamp=1665470252,
executionStatus="Successful",
taskStatus=[
TaskStatus(
name="Nihar Dataflows",
name="1",
executionStatus="Successful",
startTime=1665470244,
endTime=1665470252,
@ -129,13 +129,13 @@ EXPECTED_PIPELINE_STATUS = [
),
),
OMetaPipelineStatus(
pipeline_fqn="local_domo_pipeline.Nihar Dataflows",
pipeline_fqn="local_domo_pipeline.1",
pipeline_status=PipelineStatus(
timestamp=1665148827,
executionStatus="Successful",
taskStatus=[
TaskStatus(
name="Nihar Dataflows",
name="1",
executionStatus="Successful",
startTime=1665148818,
endTime=1665148827,
@ -148,8 +148,8 @@ EXPECTED_PIPELINE_STATUS = [
EXPECTED_PIPELINE = [
CreatePipelineRequest(
name="Nihar Dataflows",
displayName=None,
name="1",
displayName="Nihar Dataflows",
description="THis is description for Nihar dataflow",
pipelineUrl=None,
concurrency=None,
@ -157,7 +157,7 @@ EXPECTED_PIPELINE = [
startDate=datetime(2022, 10, 7, 13, 20, 16, tzinfo=timezone.utc),
tasks=[
Task(
name="Nihar Dataflows",
name="1",
displayName="Nihar Dataflows",
fullyQualifiedName=None,
description="THis is description for Nihar dataflow",