diff --git a/ingestion/examples/airflow/dags/airflow_sample_data.py b/ingestion/examples/airflow/dags/airflow_sample_data.py index 9820ea7a4df..553134589d3 100644 --- a/ingestion/examples/airflow/dags/airflow_sample_data.py +++ b/ingestion/examples/airflow/dags/airflow_sample_data.py @@ -21,6 +21,10 @@ except ModuleNotFoundError: from airflow.utils.dates import days_ago +from airflow_provider_openmetadata.lineage.callback import ( + failure_callback, + success_callback, +) from metadata.ingestion.api.workflow import Workflow default_args = { @@ -30,6 +34,8 @@ default_args = { "retries": 3, "retry_delay": timedelta(seconds=10), "execution_timeout": timedelta(minutes=60), + "on_failure_callback": failure_callback, + "on_success_callback": success_callback, } config = """ diff --git a/ingestion/examples/sample_data/pipelines/pipelineStatus.json b/ingestion/examples/sample_data/pipelines/pipelineStatus.json index 8ccbe117bbc..2ff0b0c5087 100644 --- a/ingestion/examples/sample_data/pipelines/pipelineStatus.json +++ b/ingestion/examples/sample_data/pipelines/pipelineStatus.json @@ -3,7 +3,7 @@ "pipeline": "sample_airflow.dim_address_etl", "pipelineStatus": [ { - "executionDate": 1649582444, + "executionDate": 1649582444000, "executionStatus": "Failed", "taskStatus": [ { @@ -17,7 +17,7 @@ ] }, { - "executionDate": 1649669589, + "executionDate": 164966958900, "executionStatus": "Successful", "taskStatus": [ { @@ -31,7 +31,7 @@ ] }, { - "executionDate": 1649669394, + "executionDate": 164966939400, "executionStatus": "Failed", "taskStatus": [ { @@ -45,7 +45,7 @@ ] }, { - "executionDate": 1649669174, + "executionDate": 164966917400, "executionStatus": "Failed", "taskStatus": [ { @@ -59,7 +59,7 @@ ] }, { - "executionDate": 1649669274, + "executionDate": 164966927400, "executionStatus": "Pending", "taskStatus": [ { @@ -73,7 +73,7 @@ ] }, { - "executionDate": 1649669374, + "executionDate": 164966937400, "executionStatus": "Pending", "taskStatus": [ { @@ -87,7 +87,7 @@ ] }, { - "executionDate": 1649669474, + "executionDate": 164966947400, "executionStatus": "Pending", "taskStatus": [ { diff --git a/ingestion/src/airflow_provider_openmetadata/lineage/callback.py b/ingestion/src/airflow_provider_openmetadata/lineage/callback.py index c1a1c4c4298..d03510ec1cf 100644 --- a/ingestion/src/airflow_provider_openmetadata/lineage/callback.py +++ b/ingestion/src/airflow_provider_openmetadata/lineage/callback.py @@ -13,6 +13,7 @@ OpenMetadata Airflow Lineage Backend """ import logging +import traceback from typing import TYPE_CHECKING, Dict from airflow_provider_openmetadata.lineage.config.loader import get_lineage_config @@ -61,6 +62,7 @@ def failure_callback(context: Dict[str, str]) -> None: ) except Exception as exc: # pylint: disable=broad-except + logging.error(traceback.format_exc()) logging.error("Lineage Callback exception %s", exc) @@ -82,12 +84,12 @@ def success_callback(context: Dict[str, str]) -> None: operator.log.info("Updating pipeline status on success...") - airflow_service_entity = metadata.get_by_name( + airflow_service_entity: PipelineService = metadata.get_by_name( entity=PipelineService, fqdn=config.airflow_service_name ) pipeline: Pipeline = metadata.get_by_name( entity=Pipeline, - fqdn=f"{airflow_service_entity.name}.{dag.dag_id}", + fqdn=f"{airflow_service_entity.name.__root__}.{dag.dag_id}", ) add_status( @@ -98,4 +100,5 @@ def success_callback(context: Dict[str, str]) -> None: ) except Exception as exc: # pylint: disable=broad-except + logging.error(traceback.format_exc()) logging.error("Lineage Callback exception %s", exc) diff --git a/ingestion/src/airflow_provider_openmetadata/lineage/utils.py b/ingestion/src/airflow_provider_openmetadata/lineage/utils.py index 88b35e52db4..721faa32cd6 100644 --- a/ingestion/src/airflow_provider_openmetadata/lineage/utils.py +++ b/ingestion/src/airflow_provider_openmetadata/lineage/utils.py @@ -180,7 +180,7 @@ def create_or_update_pipeline( # pylint: disable=too-many-locals # Check if the pipeline already exists operator.log.info( - f"Checking if the pipeline {airflow_service_entity.name}.{dag.dag_id} exists. If not, we will create it." + f"Checking if the pipeline {airflow_service_entity.name.__root__}.{dag.dag_id} exists. If not, we will create it." ) current_pipeline: Pipeline = metadata.get_by_name( entity=Pipeline, diff --git a/ingestion/src/metadata/utils/helpers.py b/ingestion/src/metadata/utils/helpers.py index ef5a55d04b3..bc55cad40df 100644 --- a/ingestion/src/metadata/utils/helpers.py +++ b/ingestion/src/metadata/utils/helpers.py @@ -195,9 +195,9 @@ def get_storage_service_or_create(service_json, metadata_config) -> StorageServi def datetime_to_ts(date: datetime) -> int: """ - Convert a given date to a timestamp as an Int + Convert a given date to a timestamp as an Int in milliseconds """ - return int(date.timestamp()) + return int(date.timestamp() * 1_000) def _get_formmated_table_name(table_name): diff --git a/ingestion/tests/integration/lineage/airflow/test_airflow_lineage.py b/ingestion/tests/integration/lineage/airflow/test_airflow_lineage.py index 8f6d4668322..10a8bef960c 100644 --- a/ingestion/tests/integration/lineage/airflow/test_airflow_lineage.py +++ b/ingestion/tests/integration/lineage/airflow/test_airflow_lineage.py @@ -204,7 +204,6 @@ class AirflowLineageTest(TestCase): execution_date=datetime.strptime( "2022-03-15T08:13:45", "%Y-%m-%dT%H:%M:%S" ), - run_id="scheduled__2022-03-15T08:13:45.967068+00:00", state="running", ), }, @@ -218,8 +217,6 @@ class AirflowLineageTest(TestCase): entity=Pipeline, fqdn="local_airflow_3.lineage" ) - print(lineage) - nodes = {node["id"] for node in lineage["nodes"]} self.assertIn(str(self.table.id.__root__), nodes) @@ -262,7 +259,6 @@ class AirflowLineageTest(TestCase): execution_date=datetime.strptime( "2022-03-15T08:13:45", "%Y-%m-%dT%H:%M:%S" ), - run_id="scheduled__2022-03-15T08:13:45.967068+00:00", state="running", ), }, @@ -278,7 +274,6 @@ class AirflowLineageTest(TestCase): execution_date=datetime.strptime( "2022-03-15T08:13:45", "%Y-%m-%dT%H:%M:%S" ), - run_id="scheduled__2022-03-15T08:13:45.967068+00:00", state="running", ), }, @@ -294,7 +289,6 @@ class AirflowLineageTest(TestCase): execution_date=datetime.strptime( "2022-03-15T08:13:45", "%Y-%m-%dT%H:%M:%S" ), - run_id="scheduled__2022-03-15T08:13:45.967068+00:00", state="running", ), },