Fix #4189 - Fix airflow lineage callback (#4191)

Fix #4189 - Fix airflow lineage callback (#4191)
This commit is contained in:
Pere Miquel Brull 2022-04-18 15:13:26 +02:00 committed by GitHub
parent 3f4129d239
commit 2aecebd595
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 21 additions and 18 deletions

View File

@ -21,6 +21,10 @@ except ModuleNotFoundError:
from airflow.utils.dates import days_ago
from airflow_provider_openmetadata.lineage.callback import (
failure_callback,
success_callback,
)
from metadata.ingestion.api.workflow import Workflow
default_args = {
@ -30,6 +34,8 @@ default_args = {
"retries": 3,
"retry_delay": timedelta(seconds=10),
"execution_timeout": timedelta(minutes=60),
"on_failure_callback": failure_callback,
"on_success_callback": success_callback,
}
config = """

View File

@ -3,7 +3,7 @@
"pipeline": "sample_airflow.dim_address_etl",
"pipelineStatus": [
{
"executionDate": 1649582444,
"executionDate": 1649582444000,
"executionStatus": "Failed",
"taskStatus": [
{
@ -17,7 +17,7 @@
]
},
{
"executionDate": 1649669589,
"executionDate": 164966958900,
"executionStatus": "Successful",
"taskStatus": [
{
@ -31,7 +31,7 @@
]
},
{
"executionDate": 1649669394,
"executionDate": 164966939400,
"executionStatus": "Failed",
"taskStatus": [
{
@ -45,7 +45,7 @@
]
},
{
"executionDate": 1649669174,
"executionDate": 164966917400,
"executionStatus": "Failed",
"taskStatus": [
{
@ -59,7 +59,7 @@
]
},
{
"executionDate": 1649669274,
"executionDate": 164966927400,
"executionStatus": "Pending",
"taskStatus": [
{
@ -73,7 +73,7 @@
]
},
{
"executionDate": 1649669374,
"executionDate": 164966937400,
"executionStatus": "Pending",
"taskStatus": [
{
@ -87,7 +87,7 @@
]
},
{
"executionDate": 1649669474,
"executionDate": 164966947400,
"executionStatus": "Pending",
"taskStatus": [
{

View File

@ -13,6 +13,7 @@
OpenMetadata Airflow Lineage Backend
"""
import logging
import traceback
from typing import TYPE_CHECKING, Dict
from airflow_provider_openmetadata.lineage.config.loader import get_lineage_config
@ -61,6 +62,7 @@ def failure_callback(context: Dict[str, str]) -> None:
)
except Exception as exc: # pylint: disable=broad-except
logging.error(traceback.format_exc())
logging.error("Lineage Callback exception %s", exc)
@ -82,12 +84,12 @@ def success_callback(context: Dict[str, str]) -> None:
operator.log.info("Updating pipeline status on success...")
airflow_service_entity = metadata.get_by_name(
airflow_service_entity: PipelineService = metadata.get_by_name(
entity=PipelineService, fqdn=config.airflow_service_name
)
pipeline: Pipeline = metadata.get_by_name(
entity=Pipeline,
fqdn=f"{airflow_service_entity.name}.{dag.dag_id}",
fqdn=f"{airflow_service_entity.name.__root__}.{dag.dag_id}",
)
add_status(
@ -98,4 +100,5 @@ def success_callback(context: Dict[str, str]) -> None:
)
except Exception as exc: # pylint: disable=broad-except
logging.error(traceback.format_exc())
logging.error("Lineage Callback exception %s", exc)

View File

@ -180,7 +180,7 @@ def create_or_update_pipeline( # pylint: disable=too-many-locals
# Check if the pipeline already exists
operator.log.info(
f"Checking if the pipeline {airflow_service_entity.name}.{dag.dag_id} exists. If not, we will create it."
f"Checking if the pipeline {airflow_service_entity.name.__root__}.{dag.dag_id} exists. If not, we will create it."
)
current_pipeline: Pipeline = metadata.get_by_name(
entity=Pipeline,

View File

@ -195,9 +195,9 @@ def get_storage_service_or_create(service_json, metadata_config) -> StorageServi
def datetime_to_ts(date: datetime) -> int:
"""
Convert a given date to a timestamp as an Int
Convert a given date to a timestamp as an Int in milliseconds
"""
return int(date.timestamp())
return int(date.timestamp() * 1_000)
def _get_formmated_table_name(table_name):

View File

@ -204,7 +204,6 @@ class AirflowLineageTest(TestCase):
execution_date=datetime.strptime(
"2022-03-15T08:13:45", "%Y-%m-%dT%H:%M:%S"
),
run_id="scheduled__2022-03-15T08:13:45.967068+00:00",
state="running",
),
},
@ -218,8 +217,6 @@ class AirflowLineageTest(TestCase):
entity=Pipeline, fqdn="local_airflow_3.lineage"
)
print(lineage)
nodes = {node["id"] for node in lineage["nodes"]}
self.assertIn(str(self.table.id.__root__), nodes)
@ -262,7 +259,6 @@ class AirflowLineageTest(TestCase):
execution_date=datetime.strptime(
"2022-03-15T08:13:45", "%Y-%m-%dT%H:%M:%S"
),
run_id="scheduled__2022-03-15T08:13:45.967068+00:00",
state="running",
),
},
@ -278,7 +274,6 @@ class AirflowLineageTest(TestCase):
execution_date=datetime.strptime(
"2022-03-15T08:13:45", "%Y-%m-%dT%H:%M:%S"
),
run_id="scheduled__2022-03-15T08:13:45.967068+00:00",
state="running",
),
},
@ -294,7 +289,6 @@ class AirflowLineageTest(TestCase):
execution_date=datetime.strptime(
"2022-03-15T08:13:45", "%Y-%m-%dT%H:%M:%S"
),
run_id="scheduled__2022-03-15T08:13:45.967068+00:00",
state="running",
),
},