From 05c8d1ebb68cd5be76761af1ef9c01cd9017ff49 Mon Sep 17 00:00:00 2001
From: Pere Miquel Brull
Date: Mon, 24 Jan 2022 21:29:16 +0100
Subject: [PATCH] Fix #2153 - Lineage date parsing (#2387)
* Check for start_date
* Fix date keys and remove repr
* Add date management tests
---
.../lineage/utils.py | 67 +++++++++++++------
.../lineage/airflow/test_airflow_lineage.py | 35 +++++++++-
2 files changed, 81 insertions(+), 21 deletions(-)
diff --git a/ingestion/src/airflow_provider_openmetadata/lineage/utils.py b/ingestion/src/airflow_provider_openmetadata/lineage/utils.py
index 532bf019c0f..007335fd925 100644
--- a/ingestion/src/airflow_provider_openmetadata/lineage/utils.py
+++ b/ingestion/src/airflow_provider_openmetadata/lineage/utils.py
@@ -13,9 +13,8 @@
OpenMetadata Airflow Lineage Backend
"""
-import ast
import traceback
-from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Set, Union
+from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Set, Tuple, Union
from airflow.configuration import conf
@@ -98,13 +97,11 @@ def get_properties(
:return: properties dict
"""
- props: Dict[str, str] = {
- key: repr(value) for (key, value) in serializer(obj).items()
- }
+ props: Dict[str, str] = {key: value for (key, value) in serializer(obj).items()}
for key in obj.get_serialized_fields():
if key not in props:
- props[key] = repr(getattr(obj, key))
+ props[key] = getattr(obj, key)
return {key: value for (key, value) in props.items() if key in allowed_keys}
@@ -136,6 +133,46 @@ def get_xlets(
return None
+def iso_dag_start_date(props: Dict[str, Any]) -> Optional[str]:
+ """
+ Given a properties dict, return the start_date
+ as an iso string if start_date is informed
+ :param props: properties dict
+ :return: iso start_date or None
+ """
+
+ # DAG start date comes as `float`
+ if props.get("start_date"):
+ return convert_epoch_to_iso(int(float(props["start_date"])))
+
+ return None
+
+
+def iso_task_start_end_date(
+ props: Dict[str, Any]
+) -> Tuple[Optional[str], Optional[str]]:
+ """
+ Given the attributes of a Task Instance, return
+ the task start date and task end date as
+ ISO format
+ :param props: task instance attributes
+ :return: task start and end date
+ """
+
+ task_start_date = (
+ convert_epoch_to_iso(int(props["start_date"].timestamp()))
+ if props.get("start_date")
+ else None
+ )
+ task_end_date = (
+ convert_epoch_to_iso(int(props["end_date"].timestamp()))
+ if props.get("end_date")
+ else None
+ )
+
+ return task_start_date, task_end_date
+
+
def create_pipeline_entity(
dag_properties: Dict[str, str],
task_properties: Dict[str, str],
@@ -161,25 +198,15 @@ def create_pipeline_entity(
f"{pipeline_service_url}/taskinstance/list/"
+ f"?flt1_dag_id_equals={dag.dag_id}&_flt_3_task_id={operator.task_id}"
)
- dag_start_date = convert_epoch_to_iso(int(float(dag_properties["start_date"])))
+ dag_start_date = iso_dag_start_date(dag_properties)
+ task_start_date, task_end_date = iso_task_start_end_date(task_properties)
downstream_tasks = []
- if "_downstream_task_ids" in task_properties:
- downstream_tasks = ast.literal_eval(task_properties["_downstream_task_ids"])
+ if task_properties.get("_downstream_task_ids"):
+ downstream_tasks = task_properties["_downstream_task_ids"]
operator.log.info(f"downstream tasks {downstream_tasks}")
- task_start_date = (
- task_properties["start_date"].isoformat()
- if "start_time" in task_properties
- else None
- )
- task_end_date = (
- task_properties["end_date"].isoformat()
- if "end_time" in task_properties
- else None
- )
-
task = Task(
name=task_properties["task_id"],
displayName=task_properties.get("label"), # v1.10.15 does not have label
diff --git a/ingestion/tests/integration/lineage/airflow/test_airflow_lineage.py b/ingestion/tests/integration/lineage/airflow/test_airflow_lineage.py
index 22b9e08da83..389b3871e85 100644
--- a/ingestion/tests/integration/lineage/airflow/test_airflow_lineage.py
+++ b/ingestion/tests/integration/lineage/airflow/test_airflow_lineage.py
@@ -14,7 +14,6 @@ Test airflow lineage backend
"""
from datetime import datetime, timedelta
-from textwrap import dedent
from unittest import TestCase
# The DAG object; we'll need this to instantiate a DAG
@@ -32,6 +31,10 @@ from airflow_provider_openmetadata.lineage.openmetadata import (
get_properties,
get_xlets,
)
+from airflow_provider_openmetadata.lineage.utils import (
+ iso_dag_start_date,
+ iso_task_start_end_date,
+)
from metadata.generated.schema.api.data.createDatabase import (
CreateDatabaseEntityRequest,
)
@@ -169,6 +172,36 @@ class AirflowLineageTest(TestCase):
)
self.assertTrue(set(task3_props.keys()).issubset(ALLOWED_TASK_KEYS))
+ def test_times(self):
+ """
+ Check the ISO date extraction for DAG and Tasks instances
+ """
+ dag_props = get_properties(
+ self.dag, SerializedDAG.serialize_dag, ALLOWED_FLOW_KEYS
+ )
+
+ dag_date = iso_dag_start_date(dag_props)
+ self.assertEqual("2021-01-01T00:00:00Z", dag_date)
+
+ # Remove the start_time
+ dag_props.pop("start_date")
+ dag_none_date = iso_dag_start_date(dag_props)
+ self.assertIsNone(dag_none_date)
+
+ # By default we'll get the start_date for the task,
+ # so we can check its value, but the end date
+ # might not come as in this case.
+ # Check that we can have those values as None
+ task1_props = get_properties(
+ self.dag.get_task("task1"),
+ SerializedBaseOperator.serialize_operator,
+ ALLOWED_TASK_KEYS,
+ )
+
+ task_start_date, task_end_date = iso_task_start_end_date(task1_props)
+ self.assertEqual("2021-01-01T00:00:00Z", task_start_date)
+ self.assertIsNone(task_end_date)
+
def test_lineage(self):
"""
Test end to end