diff --git a/Makefile b/Makefile index 95954ebd89b..5266acf9ffe 100644 --- a/Makefile +++ b/Makefile @@ -69,7 +69,7 @@ generate: ## Generate the pydantic models from the JSON Schemas to the ingestio ## Ingestion tests & QA .PHONY: run_ometa_integration_tests run_ometa_integration_tests: ## Run Python integration tests - coverage run --rcfile ingestion/.coveragerc -a --branch -m pytest -c ingestion/setup.cfg --junitxml=ingestion/junit/test-results-integration.xml ingestion/tests/integration/ometa ingestion/tests/integration/orm_profiler ingestion/tests/integration/test_suite ingestion/tests/integration/data_insight + coverage run --rcfile ingestion/.coveragerc -a --branch -m pytest -c ingestion/setup.cfg --junitxml=ingestion/junit/test-results-integration.xml ingestion/tests/integration/ometa ingestion/tests/integration/orm_profiler ingestion/tests/integration/test_suite ingestion/tests/integration/data_insight ingestion/tests/integration/lineage .PHONY: unit_ingestion unit_ingestion: ## Run Python unit tests diff --git a/ingestion/examples/airflow/dags/airflow_lineage_operator.py b/ingestion/examples/airflow/dags/airflow_lineage_operator.py index dfae8f4a2d2..4c02496434f 100644 --- a/ingestion/examples/airflow/dags/airflow_lineage_operator.py +++ b/ingestion/examples/airflow/dags/airflow_lineage_operator.py @@ -9,12 +9,16 @@ # See the License for the specific language governing permissions and # limitations under the License. """ -You can run this DAG from the default OM installation -""" +You can run this DAG from the default OM installation. -from datetime import datetime, timedelta +For this DAG to run properly we expected an OpenMetadata +Airflow connection named `openmetadata_conn_id`. +""" +from datetime import datetime from textwrap import dedent +import requests + # The DAG object; we'll need this to instantiate a DAG from airflow import DAG @@ -22,31 +26,54 @@ from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator +from airflow_provider_openmetadata.hooks.openmetadata import OpenMetadataHook + # These args will get passed on to each operator # You can override them on a per-task basis during operator initialization from airflow_provider_openmetadata.lineage.operator import OpenMetadataLineageOperator -from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( - OpenMetadataConnection, -) -from metadata.generated.schema.security.client.openMetadataJWTClientConfig import ( - OpenMetadataJWTClientConfig, -) -default_args = { - "retries": 1, - "retry_delay": timedelta(minutes=5), +OM_HOST_PORT = "http://localhost:8585/api" +OM_JWT = "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" +AIRFLOW_HOST_API_ROOT = "http://localhost:8080/api/v1/" +DEFAULT_OM_AIRFLOW_CONNECTION = "openmetadata_conn_id" +DEFAULT_AIRFLOW_HEADERS = { + "Content-Type": "application/json", + "Authorization": "Basic YWRtaW46YWRtaW4=", } +default_args = { + "retries": 0, +} -def explode(): - raise Exception("Oh no!") +# Create the default OpenMetadata Airflow Connection (if it does not exist) +res = requests.get( + AIRFLOW_HOST_API_ROOT + f"connections/{DEFAULT_OM_AIRFLOW_CONNECTION}", + headers=DEFAULT_AIRFLOW_HEADERS, +) +if res.status_code == 404: # not found + requests.post( + AIRFLOW_HOST_API_ROOT + "connections", + json={ + "connection_id": DEFAULT_OM_AIRFLOW_CONNECTION, + "conn_type": "openmetadata", + "host": "openmetadata-server", + "schema": "http", + "port": 8585, + "password": OM_JWT, + }, + headers=DEFAULT_AIRFLOW_HEADERS, + ) + +elif res.status_code != 200: + raise RuntimeError(f"Could not fetch {DEFAULT_OM_AIRFLOW_CONNECTION} connection") with DAG( "lineage_tutorial_operator", default_args=default_args, description="A simple tutorial DAG", - schedule_interval=timedelta(days=1), + schedule_interval=None, + is_paused_upon_creation=True, start_date=datetime(2021, 1, 1), catchup=False, tags=["example"], @@ -56,7 +83,11 @@ with DAG( t1 = BashOperator( task_id="print_date", bash_command="date", - outlets={"tables": ["sample_data.ecommerce_db.shopify.dim_address"]}, + outlets={ + "tables": [ + "test-service-table-lineage.test-db.test-schema.lineage-test-outlet" + ] + }, ) t2 = BashOperator( @@ -64,14 +95,11 @@ with DAG( depends_on_past=False, bash_command="sleep 1", retries=3, - inlets={"tables": ["sample_data.ecommerce_db.shopify.dim_customer"]}, - ) - - risen = PythonOperator( - task_id="explode", - provide_context=True, - python_callable=explode, - retries=0, + inlets={ + "tables": [ + "test-service-table-lineage.test-db.test-schema.lineage-test-inlet" + ] + }, ) dag.doc_md = ( @@ -99,20 +127,12 @@ with DAG( t1 >> [t2, t3] - server_config = OpenMetadataConnection( - hostPort="http://localhost:8585/api", - authProvider="openmetadata", - securityConfig=OpenMetadataJWTClientConfig( - jwtToken="eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" - ), - ) - t4 = OpenMetadataLineageOperator( task_id="lineage_op", depends_on_past=False, - server_config=server_config, + server_config=OpenMetadataHook(DEFAULT_OM_AIRFLOW_CONNECTION).get_conn(), service_name="airflow_lineage_op_service", only_keep_dag_lineage=True, ) - t1 >> t4 + [t1, t2, t3] >> t4 diff --git a/ingestion/tests/integration/lineage/airflow/test_airflow_lineage.py b/ingestion/tests/integration/lineage/airflow/test_airflow_lineage.py index 2e87eb7a779..80e7c0fbd9e 100644 --- a/ingestion/tests/integration/lineage/airflow/test_airflow_lineage.py +++ b/ingestion/tests/integration/lineage/airflow/test_airflow_lineage.py @@ -10,26 +10,18 @@ # limitations under the License. """ -Test airflow lineage backend +Test airflow lineage operator and hook. -These tests should be run with Airflow 2.1.4 -Other airflow versions require a different way to -mock the DAG and Task runs. +This test is coupled with the example DAG `lineage_tutorial_operator` """ - -import os +import time from datetime import datetime, timedelta -from unittest import TestCase, mock +from typing import Optional +from unittest import TestCase -# The DAG object; we'll need this to instantiate a DAG -from airflow import DAG -from airflow.models import TaskInstance -from airflow.operators.bash import BashOperator -from airflow.operators.dummy import DummyOperator -from airflow.utils.task_group import TaskGroup +import pytest +import requests -from airflow_provider_openmetadata.lineage.backend import OpenMetadataLineageBackend -from airflow_provider_openmetadata.lineage.utils import get_xlets from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest from metadata.generated.schema.api.data.createDatabaseSchema import ( CreateDatabaseSchemaRequest, @@ -38,7 +30,7 @@ from metadata.generated.schema.api.data.createTable import CreateTableRequest from metadata.generated.schema.api.services.createDatabaseService import ( CreateDatabaseServiceRequest, ) -from metadata.generated.schema.entity.data.pipeline import Pipeline +from metadata.generated.schema.entity.data.pipeline import Pipeline, StatusType from metadata.generated.schema.entity.data.table import Column, DataType from metadata.generated.schema.entity.services.connections.database.mysqlConnection import ( MysqlConnection, @@ -51,24 +43,50 @@ from metadata.generated.schema.entity.services.databaseService import ( DatabaseService, DatabaseServiceType, ) +from metadata.generated.schema.entity.services.pipelineService import PipelineService from metadata.generated.schema.security.client.openMetadataJWTClientConfig import ( OpenMetadataJWTClientConfig, ) from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.ometa.ometa_api import OpenMetadata +OM_HOST_PORT = "http://localhost:8585/api" +OM_JWT = "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" +AIRFLOW_HOST_API_ROOT = "http://localhost:8080/api/v1/" +DEFAULT_OM_AIRFLOW_CONNECTION = "openmetadata_conn_id" +DEFAULT_AIRFLOW_HEADERS = { + "Content-Type": "application/json", + "Authorization": "Basic YWRtaW46YWRtaW4=", +} +OM_LINEAGE_DAG_NAME = "lineage_tutorial_operator" +PIPELINE_SERVICE_NAME = "airflow_lineage_op_service" + + +def get_task_status_type_by_name(pipeline: Pipeline, name: str) -> Optional[StatusType]: + """ + Given a pipeline, get its status by name + """ + return next( + ( + status.executionStatus + for status in pipeline.pipelineStatus.taskStatus + if status.name == name + ), + None, + ) + class AirflowLineageTest(TestCase): """ - Run this test installing the necessary airflow version + This test will trigger an Airflow DAG and validate that the + OpenMetadata Lineage Operator can properly handle the + metadata ingestion and processes inlets and outlets. """ server_config = OpenMetadataConnection( - hostPort="http://localhost:8585/api", + hostPort=OM_HOST_PORT, authProvider="openmetadata", - securityConfig=OpenMetadataJWTClientConfig( - jwtToken="eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" - ), + securityConfig=OpenMetadataJWTClientConfig(jwtToken=OM_JWT), ) metadata = OpenMetadata(server_config) @@ -87,8 +105,6 @@ class AirflowLineageTest(TestCase): ) service_type = "databaseService" - backend = OpenMetadataLineageBackend() - @classmethod def setUpClass(cls) -> None: """ @@ -118,49 +134,20 @@ class AirflowLineageTest(TestCase): id=create_schema_entity.id, name="test-schema", type="databaseSchema" ) - create = CreateTableRequest( - name="lineage-test", + create_inlet = CreateTableRequest( + name="lineage-test-inlet", databaseSchema=schema_reference, columns=[Column(name="id", dataType=DataType.BIGINT)], ) - cls.table = cls.metadata.create_or_update(data=create) + create_outlet = CreateTableRequest( + name="lineage-test-outlet", + databaseSchema=schema_reference, + columns=[Column(name="id", dataType=DataType.BIGINT)], + ) - with DAG( - "lineage", - description="A lineage test DAG", - schedule_interval=timedelta(days=1), - start_date=datetime(2021, 1, 1), - ) as dag: - - t1 = BashOperator( # Using BashOperator as a random example - task_id="task1", - bash_command="date", - outlets={ - "tables": [ - "test-service-table-lineage.test-db.test-schema.lineage-test" - ] - }, - ) - - t2 = BashOperator( # Using BashOperator as a random example - task_id="task2", - bash_command="sleep 5", - inlets={ - "tables": [ - "test-service-table-lineage.test-db.test-schema.lineage-test" - ] - }, - ) - - t3 = BashOperator( - task_id="task3", - bash_command="echo", - ) - - t1 >> t2 >> t3 - - cls.dag = dag + cls.table_inlet = cls.metadata.create_or_update(data=create_inlet) + cls.table_outlet = cls.metadata.create_or_update(data=create_outlet) @classmethod def tearDownClass(cls) -> None: @@ -181,280 +168,134 @@ class AirflowLineageTest(TestCase): hard_delete=True, ) - def test_xlets(self): - """ - Verify that we can extract inlets and outlets - """ - - self.assertIsNone(get_xlets(self.dag.get_task("task1"), "_inlets")) - self.assertEqual( - ["test-service-table-lineage.test-db.test-schema.lineage-test"], - get_xlets(self.dag.get_task("task1"), "_outlets"), + # Service ID created from the Airflow Lineage Operator in the + # example DAG + pipeline_service_id = str( + cls.metadata.get_by_name( + entity=PipelineService, fqn=PIPELINE_SERVICE_NAME + ).id.__root__ ) - self.assertEqual( - ["test-service-table-lineage.test-db.test-schema.lineage-test"], - get_xlets(self.dag.get_task("task2"), "_inlets"), - ) - self.assertIsNone(get_xlets(self.dag.get_task("task2"), "_outlets")) - - self.assertIsNone(get_xlets(self.dag.get_task("task3"), "_inlets")) - self.assertIsNone(get_xlets(self.dag.get_task("task3"), "_outlets")) - - @mock.patch.dict( - os.environ, - {"AIRFLOW__LINEAGE__AIRFLOW_SERVICE_NAME": "int_airflow"}, - clear=True, - ) - def test_lineage(self): - """ - Test end to end - """ - - self.backend.send_lineage( - operator=self.dag.get_task("task1"), - context={ - "dag": self.dag, - "task": self.dag.get_task("task1"), - "task_instance": TaskInstance( - task=self.dag.get_task("task1"), - execution_date=datetime.strptime( - "2022-03-15T08:13:45", "%Y-%m-%dT%H:%M:%S" - ), - state="running", - ), - }, - ) - - self.assertIsNotNone( - self.metadata.get_by_name(entity=Pipeline, fqn="int_airflow.lineage") - ) - - lineage = self.metadata.get_lineage_by_name( - entity=Pipeline, fqn="int_airflow.lineage" - ) - - nodes = {node["id"] for node in lineage["nodes"]} - self.assertIn(str(self.table.id.__root__), nodes) - - @mock.patch.dict( - os.environ, - {"AIRFLOW__LINEAGE__AIRFLOW_SERVICE_NAME": "int_airflow"}, - clear=True, - ) - def test_lineage_task_group(self): - """ - Test end to end for task groups. - - Run the lineage execution mimicking - the execution of three tasks - """ - - with DAG( - "task_group_lineage", - description="A lineage test DAG", - schedule_interval=timedelta(days=1), - start_date=datetime(2021, 1, 1), - ) as dag: - t0 = DummyOperator(task_id="start") - - # Start Task Group definition - with TaskGroup(group_id="group1") as tg1: - t1 = DummyOperator(task_id="task1") - t2 = DummyOperator(task_id="task2") - - t1 >> t2 - # End Task Group definition - - t3 = DummyOperator(task_id="end") - - # Set Task Group's (tg1) dependencies - t0 >> tg1 >> t3 - - self.backend.send_lineage( - operator=dag.get_task("group1.task1"), - context={ - "dag": dag, - "task": dag.get_task("group1.task1"), - "task_instance": TaskInstance( - task=dag.get_task("group1.task1"), - execution_date=datetime.strptime( - "2022-03-15T08:13:45", "%Y-%m-%dT%H:%M:%S" - ), - state="running", - ), - }, - ) - - self.backend.send_lineage( - operator=dag.get_task("group1.task2"), - context={ - "dag": dag, - "task": dag.get_task("group1.task2"), - "task_instance": TaskInstance( - task=dag.get_task("group1.task2"), - execution_date=datetime.strptime( - "2022-03-15T08:13:45", "%Y-%m-%dT%H:%M:%S" - ), - state="running", - ), - }, - ) - - self.backend.send_lineage( - operator=dag.get_task("end"), - context={ - "dag": dag, - "task": dag.get_task("end"), - "task_instance": TaskInstance( - task=dag.get_task("end"), - execution_date=datetime.strptime( - "2022-03-15T08:13:45", "%Y-%m-%dT%H:%M:%S" - ), - state="running", - ), - }, - ) - - pipeline: Pipeline = self.metadata.get_by_name( - entity=Pipeline, fqn="int_airflow.task_group_lineage", fields=["tasks"] - ) - self.assertIsNotNone(pipeline) - self.assertIn("group1.task1", {task.name for task in pipeline.tasks}) - self.assertIn("group1.task2", {task.name for task in pipeline.tasks}) - self.assertIn("end", {task.name for task in pipeline.tasks}) - - # Validate URL building - self.assertEqual("/tree?dag_id=task_group_lineage", pipeline.pipelineUrl) - self.assertIn( - "/taskinstance/list/?flt1_dag_id_equals=task_group_lineage&_flt_3_task_id=end", - {task.taskUrl for task in pipeline.tasks}, - ) - - @mock.patch.dict( - os.environ, - {"AIRFLOW__LINEAGE__AIRFLOW_SERVICE_NAME": "int_airflow"}, - clear=True, - ) - def test_clean_tasks(self): - """ - Check that we can safely remove tasks from a Pipeline - """ - - with DAG( - "clean_test", - description="A lineage test DAG", - schedule_interval=timedelta(days=1), - start_date=datetime(2021, 1, 1), - ) as dag: - t1 = BashOperator( # Using BashOperator as a random example - task_id="task1", - bash_command="date", - ) - - t2 = BashOperator( # Using BashOperator as a random example - task_id="task2", - bash_command="sleep 5", - ) - - t1 >> t2 - - self.backend.send_lineage( - operator=dag.get_task("task1"), - context={ - "dag": dag, - "task": dag.get_task("task1"), - "task_instance": TaskInstance( - task=dag.get_task("task1"), - execution_date=datetime.strptime( - "2022-03-15T08:13:45", "%Y-%m-%dT%H:%M:%S" - ), - state="running", - ), - }, - ) - - self.backend.send_lineage( - operator=dag.get_task("task2"), - context={ - "dag": dag, - "task": dag.get_task("task2"), - "task_instance": TaskInstance( - task=dag.get_task("task2"), - execution_date=datetime.strptime( - "2022-03-15T08:13:45", "%Y-%m-%dT%H:%M:%S" - ), - state="running", - ), - }, - ) - - pipeline = self.metadata.get_by_name( - entity=Pipeline, fqn="int_airflow.clean_test", fields=["tasks"] - ) - self.assertIsNotNone(pipeline) - self.assertIn("task1", {task.name for task in pipeline.tasks}) - self.assertIn("task2", {task.name for task in pipeline.tasks}) - - with DAG( - "clean_test", - description="A lineage test DAG", - schedule_interval=timedelta(days=1), - start_date=datetime(2021, 1, 1), - ) as dag: - t1 = BashOperator( - task_id="task1", - bash_command="date", - ) - - renamed_task = BashOperator( - task_id="new_task2", - bash_command="sleep 5", - ) - - t1 >> renamed_task - - self.backend.send_lineage( - operator=dag.get_task("task1"), - context={ - "dag": dag, - "task": dag.get_task("task1"), - "task_instance": TaskInstance( - task=dag.get_task("task1"), - execution_date=datetime.strptime( - "2022-03-15T08:13:45", "%Y-%m-%dT%H:%M:%S" - ), - state="running", - ), - }, - ) - - self.backend.send_lineage( - operator=dag.get_task("new_task2"), - context={ - "dag": dag, - "task": dag.get_task("new_task2"), - "task_instance": TaskInstance( - task=dag.get_task("new_task2"), - execution_date=datetime.strptime( - "2022-03-15T08:13:45", "%Y-%m-%dT%H:%M:%S" - ), - state="running", - ), - }, - ) - - pipeline: Pipeline = self.metadata.get_by_name( - entity=Pipeline, fqn="int_airflow.clean_test", fields=["tasks"] - ) - self.assertIsNotNone(pipeline) - self.assertIn("task1", {task.name for task in pipeline.tasks}) - self.assertIn("new_task2", {task.name for task in pipeline.tasks}) - self.assertNotIn("task2", {task.name for task in pipeline.tasks}) - - self.metadata.delete( - entity=Pipeline, - entity_id=pipeline.id, + cls.metadata.delete( + entity=PipelineService, + entity_id=pipeline_service_id, recursive=True, hard_delete=True, ) + + @pytest.mark.order(1) + def test_dag_runs(self) -> None: + """ + Trigger the Airflow DAG and wait until it runs. + + Note that the DAG definition is in examples/airflow_lineage_operator.py + and it is expected to fail. This will allow us to validate + the task status afterward. + """ + + # 1. Validate that the OpenMetadata connection exists + res = requests.get( + AIRFLOW_HOST_API_ROOT + f"connections/{DEFAULT_OM_AIRFLOW_CONNECTION}", + headers=DEFAULT_AIRFLOW_HEADERS, + ) + if res.status_code != 200: + raise RuntimeError( + f"Could not fetch {DEFAULT_OM_AIRFLOW_CONNECTION} connection" + ) + + # 2. Enable the DAG + res = requests.patch( + AIRFLOW_HOST_API_ROOT + f"dags/{OM_LINEAGE_DAG_NAME}", + json={"is_paused": False}, + headers=DEFAULT_AIRFLOW_HEADERS, + ) + if res.status_code != 200: + raise RuntimeError(f"Could not enable {OM_LINEAGE_DAG_NAME} DAG") + + # 3. Trigger the DAG + res = requests.post( + AIRFLOW_HOST_API_ROOT + f"dags/{OM_LINEAGE_DAG_NAME}/dagRuns", + json={ + # the start_date of the dag is 2021-01-01 "2019-08-24T14:15:22Z" + "logical_date": datetime.strftime( + datetime.now() - timedelta(hours=1), "%Y-%m-%dT%H:%M:%SZ" + ), + }, + headers=DEFAULT_AIRFLOW_HEADERS, + ) + if res.status_code != 200: + raise RuntimeError(f"Could not trigger {OM_LINEAGE_DAG_NAME} DAG") + dag_run_id = res.json()["dag_run_id"] + + # 4. Wait until the DAG is flagged as `successful` + state = "queued" + tries = 0 + while state != "success" and tries <= 5: + tries += 1 + time.sleep(5) + + res = requests.get( + AIRFLOW_HOST_API_ROOT + + f"dags/{OM_LINEAGE_DAG_NAME}/dagRuns/{dag_run_id}", + headers=DEFAULT_AIRFLOW_HEADERS, + ) + state = res.json().get("state") + + if state != "success": + raise RuntimeError(f"DAG {OM_LINEAGE_DAG_NAME} has not finished on time.") + + @pytest.mark.order(2) + def test_pipeline_created(self) -> None: + """ + Validate that the pipeline has been created + """ + pipeline_service: PipelineService = self.metadata.get_by_name( + entity=PipelineService, fqn=PIPELINE_SERVICE_NAME + ) + self.assertIsNotNone(pipeline_service) + + pipeline: Pipeline = self.metadata.get_by_name( + entity=Pipeline, + fqn=f"{PIPELINE_SERVICE_NAME}.{OM_LINEAGE_DAG_NAME}", + fields=["tasks", "pipelineStatus"], + ) + self.assertIsNotNone(pipeline) + + expected_task_names = set((task.name for task in pipeline.tasks)) + self.assertEqual( + expected_task_names, {"print_date", "sleep", "templated", "lineage_op"} + ) + + self.assertEqual(pipeline.description.__root__, "A simple tutorial DAG") + + # Validate status + self.assertEqual( + get_task_status_type_by_name(pipeline, "print_date"), StatusType.Successful + ) + self.assertEqual( + get_task_status_type_by_name(pipeline, "sleep"), StatusType.Successful + ) + self.assertEqual( + get_task_status_type_by_name(pipeline, "templated"), StatusType.Successful + ) + + @pytest.mark.order(2) + def test_pipeline_lineage(self) -> None: + """ + Validate that the pipeline has proper lineage + """ + lineage = self.metadata.get_lineage_by_name( + entity=Pipeline, + fqn=f"{PIPELINE_SERVICE_NAME}.{OM_LINEAGE_DAG_NAME}", + ) + node_names = set((node["name"] for node in lineage.get("nodes") or [])) + self.assertEqual(node_names, {"lineage-test-inlet", "lineage-test-outlet"}) + self.assertEqual(len(lineage.get("upstreamEdges")), 1) + self.assertEqual(len(lineage.get("downstreamEdges")), 1) + self.assertEqual( + lineage["upstreamEdges"][0]["fromEntity"], str(self.table_inlet.id.__root__) + ) + self.assertEqual( + lineage["downstreamEdges"][0]["toEntity"], + str(self.table_outlet.id.__root__), + )