mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-10-19 12:50:20 +00:00
* Check for start_date * Fix date keys and remove repr * Add date management tests
This commit is contained in:
parent
6a8fc9e0fb
commit
05c8d1ebb6
@ -13,9 +13,8 @@
|
|||||||
OpenMetadata Airflow Lineage Backend
|
OpenMetadata Airflow Lineage Backend
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import ast
|
|
||||||
import traceback
|
import traceback
|
||||||
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Set, Union
|
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Set, Tuple, Union
|
||||||
|
|
||||||
from airflow.configuration import conf
|
from airflow.configuration import conf
|
||||||
|
|
||||||
@ -98,13 +97,11 @@ def get_properties(
|
|||||||
:return: properties dict
|
:return: properties dict
|
||||||
"""
|
"""
|
||||||
|
|
||||||
props: Dict[str, str] = {
|
props: Dict[str, str] = {key: value for (key, value) in serializer(obj).items()}
|
||||||
key: repr(value) for (key, value) in serializer(obj).items()
|
|
||||||
}
|
|
||||||
|
|
||||||
for key in obj.get_serialized_fields():
|
for key in obj.get_serialized_fields():
|
||||||
if key not in props:
|
if key not in props:
|
||||||
props[key] = repr(getattr(obj, key))
|
props[key] = getattr(obj, key)
|
||||||
|
|
||||||
return {key: value for (key, value) in props.items() if key in allowed_keys}
|
return {key: value for (key, value) in props.items() if key in allowed_keys}
|
||||||
|
|
||||||
@ -136,6 +133,46 @@ def get_xlets(
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def iso_dag_start_date(props: Dict[str, Any]) -> Optional[str]:
|
||||||
|
"""
|
||||||
|
Given a properties dict, return the start_date
|
||||||
|
as an iso string if start_date is informed
|
||||||
|
:param props: properties dict
|
||||||
|
:return: iso start_date or None
|
||||||
|
"""
|
||||||
|
|
||||||
|
# DAG start date comes as `float`
|
||||||
|
if props.get("start_date"):
|
||||||
|
return convert_epoch_to_iso(int(float(props["start_date"])))
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def iso_task_start_end_date(
|
||||||
|
props: Dict[str, Any]
|
||||||
|
) -> Tuple[Optional[str], Optional[str]]:
|
||||||
|
"""
|
||||||
|
Given the attributes of a Task Instance, return
|
||||||
|
the task start date and task end date as
|
||||||
|
ISO format
|
||||||
|
:param props: task instance attributes
|
||||||
|
:return: task start and end date
|
||||||
|
"""
|
||||||
|
|
||||||
|
task_start_date = (
|
||||||
|
convert_epoch_to_iso(int(props["start_date"].timestamp()))
|
||||||
|
if props.get("start_date")
|
||||||
|
else None
|
||||||
|
)
|
||||||
|
task_end_date = (
|
||||||
|
convert_epoch_to_iso(int(props["end_date"].timestamp()))
|
||||||
|
if props.get("end_date")
|
||||||
|
else None
|
||||||
|
)
|
||||||
|
|
||||||
|
return task_start_date, task_end_date
|
||||||
|
|
||||||
|
|
||||||
def create_pipeline_entity(
|
def create_pipeline_entity(
|
||||||
dag_properties: Dict[str, str],
|
dag_properties: Dict[str, str],
|
||||||
task_properties: Dict[str, str],
|
task_properties: Dict[str, str],
|
||||||
@ -161,25 +198,15 @@ def create_pipeline_entity(
|
|||||||
f"{pipeline_service_url}/taskinstance/list/"
|
f"{pipeline_service_url}/taskinstance/list/"
|
||||||
+ f"?flt1_dag_id_equals={dag.dag_id}&_flt_3_task_id={operator.task_id}"
|
+ f"?flt1_dag_id_equals={dag.dag_id}&_flt_3_task_id={operator.task_id}"
|
||||||
)
|
)
|
||||||
dag_start_date = convert_epoch_to_iso(int(float(dag_properties["start_date"])))
|
dag_start_date = iso_dag_start_date(dag_properties)
|
||||||
|
task_start_date, task_end_date = iso_task_start_end_date(task_properties)
|
||||||
|
|
||||||
downstream_tasks = []
|
downstream_tasks = []
|
||||||
if "_downstream_task_ids" in task_properties:
|
if task_properties.get("_downstream_task_ids"):
|
||||||
downstream_tasks = ast.literal_eval(task_properties["_downstream_task_ids"])
|
downstream_tasks = task_properties["_downstream_task_ids"]
|
||||||
|
|
||||||
operator.log.info(f"downstream tasks {downstream_tasks}")
|
operator.log.info(f"downstream tasks {downstream_tasks}")
|
||||||
|
|
||||||
task_start_date = (
|
|
||||||
task_properties["start_date"].isoformat()
|
|
||||||
if "start_time" in task_properties
|
|
||||||
else None
|
|
||||||
)
|
|
||||||
task_end_date = (
|
|
||||||
task_properties["end_date"].isoformat()
|
|
||||||
if "end_time" in task_properties
|
|
||||||
else None
|
|
||||||
)
|
|
||||||
|
|
||||||
task = Task(
|
task = Task(
|
||||||
name=task_properties["task_id"],
|
name=task_properties["task_id"],
|
||||||
displayName=task_properties.get("label"), # v1.10.15 does not have label
|
displayName=task_properties.get("label"), # v1.10.15 does not have label
|
||||||
|
@ -14,7 +14,6 @@ Test airflow lineage backend
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from textwrap import dedent
|
|
||||||
from unittest import TestCase
|
from unittest import TestCase
|
||||||
|
|
||||||
# The DAG object; we'll need this to instantiate a DAG
|
# The DAG object; we'll need this to instantiate a DAG
|
||||||
@ -32,6 +31,10 @@ from airflow_provider_openmetadata.lineage.openmetadata import (
|
|||||||
get_properties,
|
get_properties,
|
||||||
get_xlets,
|
get_xlets,
|
||||||
)
|
)
|
||||||
|
from airflow_provider_openmetadata.lineage.utils import (
|
||||||
|
iso_dag_start_date,
|
||||||
|
iso_task_start_end_date,
|
||||||
|
)
|
||||||
from metadata.generated.schema.api.data.createDatabase import (
|
from metadata.generated.schema.api.data.createDatabase import (
|
||||||
CreateDatabaseEntityRequest,
|
CreateDatabaseEntityRequest,
|
||||||
)
|
)
|
||||||
@ -169,6 +172,36 @@ class AirflowLineageTest(TestCase):
|
|||||||
)
|
)
|
||||||
self.assertTrue(set(task3_props.keys()).issubset(ALLOWED_TASK_KEYS))
|
self.assertTrue(set(task3_props.keys()).issubset(ALLOWED_TASK_KEYS))
|
||||||
|
|
||||||
|
def test_times(self):
|
||||||
|
"""
|
||||||
|
Check the ISO date extraction for DAG and Tasks instances
|
||||||
|
"""
|
||||||
|
dag_props = get_properties(
|
||||||
|
self.dag, SerializedDAG.serialize_dag, ALLOWED_FLOW_KEYS
|
||||||
|
)
|
||||||
|
|
||||||
|
dag_date = iso_dag_start_date(dag_props)
|
||||||
|
self.assertEqual("2021-01-01T00:00:00Z", dag_date)
|
||||||
|
|
||||||
|
# Remove the start_time
|
||||||
|
dag_props.pop("start_date")
|
||||||
|
dag_none_date = iso_dag_start_date(dag_props)
|
||||||
|
self.assertIsNone(dag_none_date)
|
||||||
|
|
||||||
|
# By default we'll get the start_date for the task,
|
||||||
|
# so we can check its value, but the end date
|
||||||
|
# might not come as in this case.
|
||||||
|
# Check that we can have those values as None
|
||||||
|
task1_props = get_properties(
|
||||||
|
self.dag.get_task("task1"),
|
||||||
|
SerializedBaseOperator.serialize_operator,
|
||||||
|
ALLOWED_TASK_KEYS,
|
||||||
|
)
|
||||||
|
|
||||||
|
task_start_date, task_end_date = iso_task_start_end_date(task1_props)
|
||||||
|
self.assertEqual("2021-01-01T00:00:00Z", task_start_date)
|
||||||
|
self.assertIsNone(task_end_date)
|
||||||
|
|
||||||
def test_lineage(self):
|
def test_lineage(self):
|
||||||
"""
|
"""
|
||||||
Test end to end
|
Test end to end
|
||||||
|
Loading…
x
Reference in New Issue
Block a user