Airflow e2e integration test (#9363)

* Prep airflow operator integration tests

* Add integration test to Makefile
This commit is contained in:
Pere Miquel Brull 2022-12-17 04:52:12 +01:00 committed by GitHub
parent b3160130f2
commit 3b7ae73473
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 229 additions and 368 deletions

View File

@ -69,7 +69,7 @@ generate: ## Generate the pydantic models from the JSON Schemas to the ingestio
## Ingestion tests & QA
.PHONY: run_ometa_integration_tests
run_ometa_integration_tests: ## Run Python integration tests
coverage run --rcfile ingestion/.coveragerc -a --branch -m pytest -c ingestion/setup.cfg --junitxml=ingestion/junit/test-results-integration.xml ingestion/tests/integration/ometa ingestion/tests/integration/orm_profiler ingestion/tests/integration/test_suite ingestion/tests/integration/data_insight
coverage run --rcfile ingestion/.coveragerc -a --branch -m pytest -c ingestion/setup.cfg --junitxml=ingestion/junit/test-results-integration.xml ingestion/tests/integration/ometa ingestion/tests/integration/orm_profiler ingestion/tests/integration/test_suite ingestion/tests/integration/data_insight ingestion/tests/integration/lineage
.PHONY: unit_ingestion
unit_ingestion: ## Run Python unit tests

View File

@ -9,12 +9,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""
You can run this DAG from the default OM installation
"""
You can run this DAG from the default OM installation.
from datetime import datetime, timedelta
For this DAG to run properly we expected an OpenMetadata
Airflow connection named `openmetadata_conn_id`.
"""
from datetime import datetime
from textwrap import dedent
import requests
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
@ -22,31 +26,54 @@ from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow_provider_openmetadata.hooks.openmetadata import OpenMetadataHook
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
from airflow_provider_openmetadata.lineage.operator import OpenMetadataLineageOperator
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
OpenMetadataJWTClientConfig,
)
default_args = {
"retries": 1,
"retry_delay": timedelta(minutes=5),
OM_HOST_PORT = "http://localhost:8585/api"
OM_JWT = "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
AIRFLOW_HOST_API_ROOT = "http://localhost:8080/api/v1/"
DEFAULT_OM_AIRFLOW_CONNECTION = "openmetadata_conn_id"
DEFAULT_AIRFLOW_HEADERS = {
"Content-Type": "application/json",
"Authorization": "Basic YWRtaW46YWRtaW4=",
}
default_args = {
"retries": 0,
}
def explode():
raise Exception("Oh no!")
# Create the default OpenMetadata Airflow Connection (if it does not exist)
res = requests.get(
AIRFLOW_HOST_API_ROOT + f"connections/{DEFAULT_OM_AIRFLOW_CONNECTION}",
headers=DEFAULT_AIRFLOW_HEADERS,
)
if res.status_code == 404: # not found
requests.post(
AIRFLOW_HOST_API_ROOT + "connections",
json={
"connection_id": DEFAULT_OM_AIRFLOW_CONNECTION,
"conn_type": "openmetadata",
"host": "openmetadata-server",
"schema": "http",
"port": 8585,
"password": OM_JWT,
},
headers=DEFAULT_AIRFLOW_HEADERS,
)
elif res.status_code != 200:
raise RuntimeError(f"Could not fetch {DEFAULT_OM_AIRFLOW_CONNECTION} connection")
with DAG(
"lineage_tutorial_operator",
default_args=default_args,
description="A simple tutorial DAG",
schedule_interval=timedelta(days=1),
schedule_interval=None,
is_paused_upon_creation=True,
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
@ -56,7 +83,11 @@ with DAG(
t1 = BashOperator(
task_id="print_date",
bash_command="date",
outlets={"tables": ["sample_data.ecommerce_db.shopify.dim_address"]},
outlets={
"tables": [
"test-service-table-lineage.test-db.test-schema.lineage-test-outlet"
]
},
)
t2 = BashOperator(
@ -64,14 +95,11 @@ with DAG(
depends_on_past=False,
bash_command="sleep 1",
retries=3,
inlets={"tables": ["sample_data.ecommerce_db.shopify.dim_customer"]},
)
risen = PythonOperator(
task_id="explode",
provide_context=True,
python_callable=explode,
retries=0,
inlets={
"tables": [
"test-service-table-lineage.test-db.test-schema.lineage-test-inlet"
]
},
)
dag.doc_md = (
@ -99,20 +127,12 @@ with DAG(
t1 >> [t2, t3]
server_config = OpenMetadataConnection(
hostPort="http://localhost:8585/api",
authProvider="openmetadata",
securityConfig=OpenMetadataJWTClientConfig(
jwtToken="eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
),
)
t4 = OpenMetadataLineageOperator(
task_id="lineage_op",
depends_on_past=False,
server_config=server_config,
server_config=OpenMetadataHook(DEFAULT_OM_AIRFLOW_CONNECTION).get_conn(),
service_name="airflow_lineage_op_service",
only_keep_dag_lineage=True,
)
t1 >> t4
[t1, t2, t3] >> t4

View File

@ -10,26 +10,18 @@
# limitations under the License.
"""
Test airflow lineage backend
Test airflow lineage operator and hook.
These tests should be run with Airflow 2.1.4
Other airflow versions require a different way to
mock the DAG and Task runs.
This test is coupled with the example DAG `lineage_tutorial_operator`
"""
import os
import time
from datetime import datetime, timedelta
from unittest import TestCase, mock
from typing import Optional
from unittest import TestCase
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
from airflow.models import TaskInstance
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.task_group import TaskGroup
import pytest
import requests
from airflow_provider_openmetadata.lineage.backend import OpenMetadataLineageBackend
from airflow_provider_openmetadata.lineage.utils import get_xlets
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
@ -38,7 +30,7 @@ from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.services.createDatabaseService import (
CreateDatabaseServiceRequest,
)
from metadata.generated.schema.entity.data.pipeline import Pipeline
from metadata.generated.schema.entity.data.pipeline import Pipeline, StatusType
from metadata.generated.schema.entity.data.table import Column, DataType
from metadata.generated.schema.entity.services.connections.database.mysqlConnection import (
MysqlConnection,
@ -51,24 +43,50 @@ from metadata.generated.schema.entity.services.databaseService import (
DatabaseService,
DatabaseServiceType,
)
from metadata.generated.schema.entity.services.pipelineService import PipelineService
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
OpenMetadataJWTClientConfig,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.ometa.ometa_api import OpenMetadata
OM_HOST_PORT = "http://localhost:8585/api"
OM_JWT = "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
AIRFLOW_HOST_API_ROOT = "http://localhost:8080/api/v1/"
DEFAULT_OM_AIRFLOW_CONNECTION = "openmetadata_conn_id"
DEFAULT_AIRFLOW_HEADERS = {
"Content-Type": "application/json",
"Authorization": "Basic YWRtaW46YWRtaW4=",
}
OM_LINEAGE_DAG_NAME = "lineage_tutorial_operator"
PIPELINE_SERVICE_NAME = "airflow_lineage_op_service"
def get_task_status_type_by_name(pipeline: Pipeline, name: str) -> Optional[StatusType]:
"""
Given a pipeline, get its status by name
"""
return next(
(
status.executionStatus
for status in pipeline.pipelineStatus.taskStatus
if status.name == name
),
None,
)
class AirflowLineageTest(TestCase):
"""
Run this test installing the necessary airflow version
This test will trigger an Airflow DAG and validate that the
OpenMetadata Lineage Operator can properly handle the
metadata ingestion and processes inlets and outlets.
"""
server_config = OpenMetadataConnection(
hostPort="http://localhost:8585/api",
hostPort=OM_HOST_PORT,
authProvider="openmetadata",
securityConfig=OpenMetadataJWTClientConfig(
jwtToken="eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
),
securityConfig=OpenMetadataJWTClientConfig(jwtToken=OM_JWT),
)
metadata = OpenMetadata(server_config)
@ -87,8 +105,6 @@ class AirflowLineageTest(TestCase):
)
service_type = "databaseService"
backend = OpenMetadataLineageBackend()
@classmethod
def setUpClass(cls) -> None:
"""
@ -118,49 +134,20 @@ class AirflowLineageTest(TestCase):
id=create_schema_entity.id, name="test-schema", type="databaseSchema"
)
create = CreateTableRequest(
name="lineage-test",
create_inlet = CreateTableRequest(
name="lineage-test-inlet",
databaseSchema=schema_reference,
columns=[Column(name="id", dataType=DataType.BIGINT)],
)
cls.table = cls.metadata.create_or_update(data=create)
with DAG(
"lineage",
description="A lineage test DAG",
schedule_interval=timedelta(days=1),
start_date=datetime(2021, 1, 1),
) as dag:
t1 = BashOperator( # Using BashOperator as a random example
task_id="task1",
bash_command="date",
outlets={
"tables": [
"test-service-table-lineage.test-db.test-schema.lineage-test"
]
},
create_outlet = CreateTableRequest(
name="lineage-test-outlet",
databaseSchema=schema_reference,
columns=[Column(name="id", dataType=DataType.BIGINT)],
)
t2 = BashOperator( # Using BashOperator as a random example
task_id="task2",
bash_command="sleep 5",
inlets={
"tables": [
"test-service-table-lineage.test-db.test-schema.lineage-test"
]
},
)
t3 = BashOperator(
task_id="task3",
bash_command="echo",
)
t1 >> t2 >> t3
cls.dag = dag
cls.table_inlet = cls.metadata.create_or_update(data=create_inlet)
cls.table_outlet = cls.metadata.create_or_update(data=create_outlet)
@classmethod
def tearDownClass(cls) -> None:
@ -181,280 +168,134 @@ class AirflowLineageTest(TestCase):
hard_delete=True,
)
def test_xlets(self):
"""
Verify that we can extract inlets and outlets
"""
self.assertIsNone(get_xlets(self.dag.get_task("task1"), "_inlets"))
self.assertEqual(
["test-service-table-lineage.test-db.test-schema.lineage-test"],
get_xlets(self.dag.get_task("task1"), "_outlets"),
# Service ID created from the Airflow Lineage Operator in the
# example DAG
pipeline_service_id = str(
cls.metadata.get_by_name(
entity=PipelineService, fqn=PIPELINE_SERVICE_NAME
).id.__root__
)
self.assertEqual(
["test-service-table-lineage.test-db.test-schema.lineage-test"],
get_xlets(self.dag.get_task("task2"), "_inlets"),
)
self.assertIsNone(get_xlets(self.dag.get_task("task2"), "_outlets"))
self.assertIsNone(get_xlets(self.dag.get_task("task3"), "_inlets"))
self.assertIsNone(get_xlets(self.dag.get_task("task3"), "_outlets"))
@mock.patch.dict(
os.environ,
{"AIRFLOW__LINEAGE__AIRFLOW_SERVICE_NAME": "int_airflow"},
clear=True,
)
def test_lineage(self):
"""
Test end to end
"""
self.backend.send_lineage(
operator=self.dag.get_task("task1"),
context={
"dag": self.dag,
"task": self.dag.get_task("task1"),
"task_instance": TaskInstance(
task=self.dag.get_task("task1"),
execution_date=datetime.strptime(
"2022-03-15T08:13:45", "%Y-%m-%dT%H:%M:%S"
),
state="running",
),
},
)
self.assertIsNotNone(
self.metadata.get_by_name(entity=Pipeline, fqn="int_airflow.lineage")
)
lineage = self.metadata.get_lineage_by_name(
entity=Pipeline, fqn="int_airflow.lineage"
)
nodes = {node["id"] for node in lineage["nodes"]}
self.assertIn(str(self.table.id.__root__), nodes)
@mock.patch.dict(
os.environ,
{"AIRFLOW__LINEAGE__AIRFLOW_SERVICE_NAME": "int_airflow"},
clear=True,
)
def test_lineage_task_group(self):
"""
Test end to end for task groups.
Run the lineage execution mimicking
the execution of three tasks
"""
with DAG(
"task_group_lineage",
description="A lineage test DAG",
schedule_interval=timedelta(days=1),
start_date=datetime(2021, 1, 1),
) as dag:
t0 = DummyOperator(task_id="start")
# Start Task Group definition
with TaskGroup(group_id="group1") as tg1:
t1 = DummyOperator(task_id="task1")
t2 = DummyOperator(task_id="task2")
t1 >> t2
# End Task Group definition
t3 = DummyOperator(task_id="end")
# Set Task Group's (tg1) dependencies
t0 >> tg1 >> t3
self.backend.send_lineage(
operator=dag.get_task("group1.task1"),
context={
"dag": dag,
"task": dag.get_task("group1.task1"),
"task_instance": TaskInstance(
task=dag.get_task("group1.task1"),
execution_date=datetime.strptime(
"2022-03-15T08:13:45", "%Y-%m-%dT%H:%M:%S"
),
state="running",
),
},
)
self.backend.send_lineage(
operator=dag.get_task("group1.task2"),
context={
"dag": dag,
"task": dag.get_task("group1.task2"),
"task_instance": TaskInstance(
task=dag.get_task("group1.task2"),
execution_date=datetime.strptime(
"2022-03-15T08:13:45", "%Y-%m-%dT%H:%M:%S"
),
state="running",
),
},
)
self.backend.send_lineage(
operator=dag.get_task("end"),
context={
"dag": dag,
"task": dag.get_task("end"),
"task_instance": TaskInstance(
task=dag.get_task("end"),
execution_date=datetime.strptime(
"2022-03-15T08:13:45", "%Y-%m-%dT%H:%M:%S"
),
state="running",
),
},
)
pipeline: Pipeline = self.metadata.get_by_name(
entity=Pipeline, fqn="int_airflow.task_group_lineage", fields=["tasks"]
)
self.assertIsNotNone(pipeline)
self.assertIn("group1.task1", {task.name for task in pipeline.tasks})
self.assertIn("group1.task2", {task.name for task in pipeline.tasks})
self.assertIn("end", {task.name for task in pipeline.tasks})
# Validate URL building
self.assertEqual("/tree?dag_id=task_group_lineage", pipeline.pipelineUrl)
self.assertIn(
"/taskinstance/list/?flt1_dag_id_equals=task_group_lineage&_flt_3_task_id=end",
{task.taskUrl for task in pipeline.tasks},
)
@mock.patch.dict(
os.environ,
{"AIRFLOW__LINEAGE__AIRFLOW_SERVICE_NAME": "int_airflow"},
clear=True,
)
def test_clean_tasks(self):
"""
Check that we can safely remove tasks from a Pipeline
"""
with DAG(
"clean_test",
description="A lineage test DAG",
schedule_interval=timedelta(days=1),
start_date=datetime(2021, 1, 1),
) as dag:
t1 = BashOperator( # Using BashOperator as a random example
task_id="task1",
bash_command="date",
)
t2 = BashOperator( # Using BashOperator as a random example
task_id="task2",
bash_command="sleep 5",
)
t1 >> t2
self.backend.send_lineage(
operator=dag.get_task("task1"),
context={
"dag": dag,
"task": dag.get_task("task1"),
"task_instance": TaskInstance(
task=dag.get_task("task1"),
execution_date=datetime.strptime(
"2022-03-15T08:13:45", "%Y-%m-%dT%H:%M:%S"
),
state="running",
),
},
)
self.backend.send_lineage(
operator=dag.get_task("task2"),
context={
"dag": dag,
"task": dag.get_task("task2"),
"task_instance": TaskInstance(
task=dag.get_task("task2"),
execution_date=datetime.strptime(
"2022-03-15T08:13:45", "%Y-%m-%dT%H:%M:%S"
),
state="running",
),
},
)
pipeline = self.metadata.get_by_name(
entity=Pipeline, fqn="int_airflow.clean_test", fields=["tasks"]
)
self.assertIsNotNone(pipeline)
self.assertIn("task1", {task.name for task in pipeline.tasks})
self.assertIn("task2", {task.name for task in pipeline.tasks})
with DAG(
"clean_test",
description="A lineage test DAG",
schedule_interval=timedelta(days=1),
start_date=datetime(2021, 1, 1),
) as dag:
t1 = BashOperator(
task_id="task1",
bash_command="date",
)
renamed_task = BashOperator(
task_id="new_task2",
bash_command="sleep 5",
)
t1 >> renamed_task
self.backend.send_lineage(
operator=dag.get_task("task1"),
context={
"dag": dag,
"task": dag.get_task("task1"),
"task_instance": TaskInstance(
task=dag.get_task("task1"),
execution_date=datetime.strptime(
"2022-03-15T08:13:45", "%Y-%m-%dT%H:%M:%S"
),
state="running",
),
},
)
self.backend.send_lineage(
operator=dag.get_task("new_task2"),
context={
"dag": dag,
"task": dag.get_task("new_task2"),
"task_instance": TaskInstance(
task=dag.get_task("new_task2"),
execution_date=datetime.strptime(
"2022-03-15T08:13:45", "%Y-%m-%dT%H:%M:%S"
),
state="running",
),
},
)
pipeline: Pipeline = self.metadata.get_by_name(
entity=Pipeline, fqn="int_airflow.clean_test", fields=["tasks"]
)
self.assertIsNotNone(pipeline)
self.assertIn("task1", {task.name for task in pipeline.tasks})
self.assertIn("new_task2", {task.name for task in pipeline.tasks})
self.assertNotIn("task2", {task.name for task in pipeline.tasks})
self.metadata.delete(
entity=Pipeline,
entity_id=pipeline.id,
cls.metadata.delete(
entity=PipelineService,
entity_id=pipeline_service_id,
recursive=True,
hard_delete=True,
)
@pytest.mark.order(1)
def test_dag_runs(self) -> None:
"""
Trigger the Airflow DAG and wait until it runs.
Note that the DAG definition is in examples/airflow_lineage_operator.py
and it is expected to fail. This will allow us to validate
the task status afterward.
"""
# 1. Validate that the OpenMetadata connection exists
res = requests.get(
AIRFLOW_HOST_API_ROOT + f"connections/{DEFAULT_OM_AIRFLOW_CONNECTION}",
headers=DEFAULT_AIRFLOW_HEADERS,
)
if res.status_code != 200:
raise RuntimeError(
f"Could not fetch {DEFAULT_OM_AIRFLOW_CONNECTION} connection"
)
# 2. Enable the DAG
res = requests.patch(
AIRFLOW_HOST_API_ROOT + f"dags/{OM_LINEAGE_DAG_NAME}",
json={"is_paused": False},
headers=DEFAULT_AIRFLOW_HEADERS,
)
if res.status_code != 200:
raise RuntimeError(f"Could not enable {OM_LINEAGE_DAG_NAME} DAG")
# 3. Trigger the DAG
res = requests.post(
AIRFLOW_HOST_API_ROOT + f"dags/{OM_LINEAGE_DAG_NAME}/dagRuns",
json={
# the start_date of the dag is 2021-01-01 "2019-08-24T14:15:22Z"
"logical_date": datetime.strftime(
datetime.now() - timedelta(hours=1), "%Y-%m-%dT%H:%M:%SZ"
),
},
headers=DEFAULT_AIRFLOW_HEADERS,
)
if res.status_code != 200:
raise RuntimeError(f"Could not trigger {OM_LINEAGE_DAG_NAME} DAG")
dag_run_id = res.json()["dag_run_id"]
# 4. Wait until the DAG is flagged as `successful`
state = "queued"
tries = 0
while state != "success" and tries <= 5:
tries += 1
time.sleep(5)
res = requests.get(
AIRFLOW_HOST_API_ROOT
+ f"dags/{OM_LINEAGE_DAG_NAME}/dagRuns/{dag_run_id}",
headers=DEFAULT_AIRFLOW_HEADERS,
)
state = res.json().get("state")
if state != "success":
raise RuntimeError(f"DAG {OM_LINEAGE_DAG_NAME} has not finished on time.")
@pytest.mark.order(2)
def test_pipeline_created(self) -> None:
"""
Validate that the pipeline has been created
"""
pipeline_service: PipelineService = self.metadata.get_by_name(
entity=PipelineService, fqn=PIPELINE_SERVICE_NAME
)
self.assertIsNotNone(pipeline_service)
pipeline: Pipeline = self.metadata.get_by_name(
entity=Pipeline,
fqn=f"{PIPELINE_SERVICE_NAME}.{OM_LINEAGE_DAG_NAME}",
fields=["tasks", "pipelineStatus"],
)
self.assertIsNotNone(pipeline)
expected_task_names = set((task.name for task in pipeline.tasks))
self.assertEqual(
expected_task_names, {"print_date", "sleep", "templated", "lineage_op"}
)
self.assertEqual(pipeline.description.__root__, "A simple tutorial DAG")
# Validate status
self.assertEqual(
get_task_status_type_by_name(pipeline, "print_date"), StatusType.Successful
)
self.assertEqual(
get_task_status_type_by_name(pipeline, "sleep"), StatusType.Successful
)
self.assertEqual(
get_task_status_type_by_name(pipeline, "templated"), StatusType.Successful
)
@pytest.mark.order(2)
def test_pipeline_lineage(self) -> None:
"""
Validate that the pipeline has proper lineage
"""
lineage = self.metadata.get_lineage_by_name(
entity=Pipeline,
fqn=f"{PIPELINE_SERVICE_NAME}.{OM_LINEAGE_DAG_NAME}",
)
node_names = set((node["name"] for node in lineage.get("nodes") or []))
self.assertEqual(node_names, {"lineage-test-inlet", "lineage-test-outlet"})
self.assertEqual(len(lineage.get("upstreamEdges")), 1)
self.assertEqual(len(lineage.get("downstreamEdges")), 1)
self.assertEqual(
lineage["upstreamEdges"][0]["fromEntity"], str(self.table_inlet.id.__root__)
)
self.assertEqual(
lineage["downstreamEdges"][0]["toEntity"],
str(self.table_outlet.id.__root__),
)