Handle XLets in groups for AirflowLineageRunner (#10114)

* Handle XLets in groups

* Linting

* Linting
This commit is contained in:
Pere Miquel Brull 2023-02-07 06:49:46 +01:00 committed by GitHub
parent b76ec0a18f
commit fb15c896b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 365 additions and 138 deletions

View File

@ -24,7 +24,6 @@ from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow_provider_openmetadata.hooks.openmetadata import OpenMetadataHook

View File

@ -23,8 +23,11 @@ from airflow_provider_openmetadata.lineage.config.loader import (
get_lineage_config,
)
from airflow_provider_openmetadata.lineage.runner import AirflowLineageRunner
from airflow_provider_openmetadata.lineage.xlets import XLets, get_xlets_from_dag
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.pipeline.airflow.lineage_parser import (
XLets,
get_xlets_from_dag,
)
# pylint: disable=too-few-public-methods
@ -70,13 +73,13 @@ class OpenMetadataLineageBackend(LineageBackend):
try:
config: AirflowLineageConfig = get_lineage_config()
metadata = OpenMetadata(config.metadata_config)
xlets: XLets = get_xlets_from_dag(context["dag"])
xlet_list: List[XLets] = get_xlets_from_dag(context["dag"])
runner = AirflowLineageRunner(
metadata=metadata,
service_name=config.airflow_service_name,
dag=context["dag"],
xlets=xlets,
xlets=xlet_list,
only_keep_dag_lineage=config.only_keep_dag_lineage,
max_status=config.max_status,
)

View File

@ -13,16 +13,20 @@
OpenMetadata Airflow Lineage Operator
"""
import traceback
from typing import List
from airflow.models.baseoperator import BaseOperator
from airflow.utils.context import Context
from airflow_provider_openmetadata.lineage.runner import AirflowLineageRunner
from airflow_provider_openmetadata.lineage.xlets import XLets, get_xlets_from_dag
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.pipeline.airflow.lineage_parser import (
XLets,
get_xlets_from_dag,
)
class OpenMetadataLineageOperator(BaseOperator):
@ -57,13 +61,18 @@ class OpenMetadataLineageOperator(BaseOperator):
and push it to OpenMetadata using the Python Client.
"""
try:
xlets: XLets = get_xlets_from_dag(self.dag)
xlet_list: List[XLets] = get_xlets_from_dag(self.dag)
self.dag.log.info(
f"Extracted the following XLet data from the DAG: {xlet_list}"
)
metadata = OpenMetadata(self.server_config)
runner = AirflowLineageRunner(
metadata=metadata,
service_name=self.service_name,
dag=self.dag,
xlets=xlets,
xlets=xlet_list,
only_keep_dag_lineage=self.only_keep_dag_lineage,
max_status=self.max_status,
)

View File

@ -19,7 +19,6 @@ from airflow.configuration import conf
from pydantic import BaseModel
from airflow_provider_openmetadata.lineage.status import STATUS_MAP
from airflow_provider_openmetadata.lineage.xlets import XLets
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.api.services.createPipelineService import (
@ -44,9 +43,10 @@ from metadata.generated.schema.entity.services.pipelineService import (
PipelineService,
PipelineServiceType,
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.pipeline.airflow.lineage_parser import XLets
from metadata.utils.helpers import datetime_to_ts
@ -81,7 +81,7 @@ class AirflowLineageRunner:
metadata: OpenMetadata,
service_name: str,
dag: "DAG",
xlets: Optional[XLets] = None,
xlets: Optional[List[XLets]] = None,
only_keep_dag_lineage: bool = False,
max_status: int = 10,
):
@ -248,37 +248,32 @@ class AirflowLineageRunner:
Add the lineage from inlets and outlets
"""
for table_fqn in xlets.inlets or []:
table_entity = self.metadata.get_by_name(entity=Table, fqn=table_fqn)
try:
lineage = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(id=table_entity.id, type="table"),
toEntity=EntityReference(id=pipeline.id, type="pipeline"),
)
)
self.metadata.add_lineage(lineage)
except AttributeError as err:
self.dag.log.error(
f"Error trying to access Entity data due to: {err}."
f" Is the table [{table_fqn}] present in OpenMetadata?"
)
lineage_details = LineageDetails(
pipeline=EntityReference(id=pipeline.id, type="pipeline")
)
for table_fqn in xlets.outlets or []:
table_entity = self.metadata.get_by_name(entity=Table, fqn=table_fqn)
try:
lineage = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(id=pipeline.id, type="pipeline"),
toEntity=EntityReference(id=table_entity.id, type="table"),
for fqn_in, fqn_out in zip(xlets.inlets, xlets.outlets):
table_in: Optional[Table] = self.metadata.get_by_name(
entity=Table, fqn=fqn_in
)
table_out: Optional[Table] = self.metadata.get_by_name(
entity=Table, fqn=fqn_out
)
if table_in and table_out:
try:
lineage = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(id=table_in.id, type="table"),
toEntity=EntityReference(id=table_out.id, type="table"),
lineageDetails=lineage_details,
),
)
self.metadata.add_lineage(lineage)
except AttributeError as err:
self.dag.log.error(
f"Error trying to compute lineage due to: {err}."
)
)
self.metadata.add_lineage(lineage)
except AttributeError as err:
self.dag.log.error(
f"Error trying to access Entity data due to: {err}."
f" Is the table [{table_fqn}] present in OpenMetadata?"
)
def clean_lineage(self, pipeline: Pipeline, xlets: XLets):
"""
@ -343,11 +338,13 @@ class AirflowLineageRunner:
pipeline = self.create_pipeline_entity(pipeline_service)
self.add_all_pipeline_status(pipeline)
if self.xlets:
self.dag.log.info("Got some xlet data. Processing lineage...")
self.add_lineage(pipeline, self.xlets)
self.dag.log.info(f"Processing XLet data {self.xlets}")
for xlet in self.xlets or []:
self.dag.log.info(f"Got some xlet data. Processing lineage for {xlet}")
self.add_lineage(pipeline, xlet)
if self.only_keep_dag_lineage:
self.dag.log.info(
"`only_keep_dag_lineage` is set to True. Cleaning lineage not in inlets or outlets..."
)
self.clean_lineage(pipeline, self.xlets)
self.clean_lineage(pipeline, xlet)

View File

@ -1,85 +0,0 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Handle Airflow inlets and outlets
"""
from typing import List, Optional, Set
from pydantic import BaseModel
class XLets(BaseModel):
"""
Group inlets and outlets from all tasks in a DAG
"""
inlets: Set[str]
outlets: Set[str]
def parse_xlets(xlet: List[dict]) -> Optional[List[str]]:
"""
Parse airflow xlets for V1
:param xlet: airflow v2 xlet dict
:return: table list or None
"""
if len(xlet) and isinstance(xlet[0], dict):
tables = xlet[0].get("tables")
if tables and isinstance(tables, list):
return tables
return None
def get_xlets_from_operator(
operator: "BaseOperator", xlet_mode: str = "_inlets"
) -> Optional[List[str]]:
"""
Given an Airflow DAG Task, obtain the tables
set in inlets or outlets.
We expect xlets to have the following structure:
[{'tables': ['FQN']}]
:param operator: task to get xlets from
:param xlet_mode: get inlet or outlet
:return: list of tables FQN
"""
xlet = getattr(operator, xlet_mode)
tables = parse_xlets(xlet)
if not tables:
operator.log.debug(f"Not finding proper {xlet_mode} in task {operator.task_id}")
else:
operator.log.info(f"Found {xlet_mode} {tables} in task {operator.task_id}")
return tables
def get_xlets_from_dag(dag: "DAG") -> XLets:
"""
Fill the inlets and outlets of the Pipeline by iterating
over all its tasks
"""
_inlets = set()
_outlets = set()
for task in dag.tasks:
_inlets.update(
get_xlets_from_operator(operator=task, xlet_mode="_inlets") or []
)
_outlets.update(
get_xlets_from_operator(operator=task, xlet_mode="_outlets") or []
)
return XLets(inlets=_inlets, outlets=_outlets)

View File

@ -0,0 +1,147 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Handle Airflow inlets and outlets.
We can have different scenarios of inlets and outlets:
```
t1 = BashOperator(
task_id="print_date",
bash_command="date",
inlets={
"tables": ["A"]
},
)
t2 = BashOperator(
task_id="sleep",
bash_command="sleep 1",
outlets={
"tables": ["B"]
},
)
```
we'll join the keys and get XLet(inlets=[A], outlets=[B])
But we can also have a pipeline working on different sets of tables:
```
t1 = BashOperator(
task_id="print_date",
bash_command="date",
inlets={
"tables": ["A"],
"more_tables": ["X"]
},
)
t2 = BashOperator(
task_id="sleep",
bash_command="sleep 1",
outlets={
"tables": ["B"],
"more_tables": ["Y", "Z"]
},
)
```
we'll join the keys and get [
XLet(inlets=[A], outlets=[B]),
XLet(inlets=[X], outlets=[Y, Z]),
]
and we'll treat this as independent sets of lineage
"""
from typing import Dict, List, Optional, Set
from pydantic import BaseModel
INLETS_ATTR = "_inlets"
OUTLETS_ATTR = "_outlets"
class XLets(BaseModel):
"""
Group inlets and outlets from all tasks in a DAG
"""
inlets: Set[str]
outlets: Set[str]
def parse_xlets(xlet: List[dict]) -> Optional[Dict[str, List[str]]]:
"""
Parse airflow xlets for V1
:param xlet: airflow v2 xlet dict
:return: dictionary of xlet list or None
"""
if isinstance(xlet, list) and len(xlet) and isinstance(xlet[0], dict):
xlet_dict = xlet[0]
return {
key: value for key, value in xlet_dict.items() if isinstance(value, list)
}
return None
def get_xlets_from_operator(
operator: "BaseOperator", xlet_mode: str = INLETS_ATTR
) -> Optional[Dict[str, List[str]]]:
"""
Given an Airflow DAG Task, obtain the tables
set in inlets or outlets.
We expect xlets to have the following structure:
[{'tables': ['FQN']}]
:param operator: task to get xlets from
:param xlet_mode: get inlet or outlet
:return: list of tables FQN
"""
xlet = getattr(operator, xlet_mode)
xlet_data = parse_xlets(xlet)
if not xlet_data:
operator.log.debug(f"Not finding proper {xlet_mode} in task {operator.task_id}")
else:
operator.log.info(f"Found {xlet_mode} {xlet_data} in task {operator.task_id}")
return xlet_data
def get_xlets_from_dag(dag: "DAG") -> List[XLets]:
"""
Fill the inlets and outlets of the Pipeline by iterating
over all its tasks
"""
_inlets = {}
_outlets = {}
# First, grab all the inlets and outlets from all tasks grouped by keys
for task in dag.tasks:
_inlets.update(
get_xlets_from_operator(operator=task, xlet_mode=INLETS_ATTR) or []
)
_outlets.update(
get_xlets_from_operator(operator=task, xlet_mode=OUTLETS_ATTR) or []
)
# We expect to have the same keys in both inlets and outlets dicts
# We will then iterate over the inlet keys to build the list of XLets
return [
XLets(inlets=set(value), outlets=set(_outlets[key]))
for key, value in _inlets.items()
if value and _outlets.get(key)
]

View File

@ -12,7 +12,10 @@
"""
Test airflow lineage operator and hook.
This test is coupled with the example DAG `lineage_tutorial_operator`
This test is coupled with the example DAG `lineage_tutorial_operator`.
With the `docker compose up` setup, you can debug the progress
by setting breakpoints in this file.
"""
import time
from datetime import datetime, timedelta
@ -31,7 +34,7 @@ from metadata.generated.schema.api.services.createDatabaseService import (
CreateDatabaseServiceRequest,
)
from metadata.generated.schema.entity.data.pipeline import Pipeline, StatusType
from metadata.generated.schema.entity.data.table import Column, DataType
from metadata.generated.schema.entity.data.table import Column, DataType, Table
from metadata.generated.schema.entity.services.connections.database.mysqlConnection import (
MysqlConnection,
)
@ -279,23 +282,25 @@ class AirflowLineageTest(TestCase):
get_task_status_type_by_name(pipeline, "templated"), StatusType.Successful
)
@pytest.mark.order(2)
@pytest.mark.order(3)
def test_pipeline_lineage(self) -> None:
"""
Validate that the pipeline has proper lineage
"""
lineage = self.metadata.get_lineage_by_name(
entity=Pipeline,
fqn=f"{PIPELINE_SERVICE_NAME}.{OM_LINEAGE_DAG_NAME}",
entity=Table,
fqn="test-service-table-lineage.test-db.test-schema.lineage-test-inlet",
)
node_names = set((node["name"] for node in lineage.get("nodes") or []))
self.assertEqual(node_names, {"lineage-test-inlet", "lineage-test-outlet"})
self.assertEqual(len(lineage.get("upstreamEdges")), 1)
self.assertEqual(node_names, {"lineage-test-outlet"})
self.assertEqual(len(lineage.get("downstreamEdges")), 1)
self.assertEqual(
lineage["upstreamEdges"][0]["fromEntity"], str(self.table_inlet.id.__root__)
)
self.assertEqual(
lineage["downstreamEdges"][0]["toEntity"],
str(self.table_outlet.id.__root__),
)
self.assertEqual(
lineage["downstreamEdges"][0]["lineageDetails"]["pipeline"][
"fullyQualifiedName"
],
f"{PIPELINE_SERVICE_NAME}.{OM_LINEAGE_DAG_NAME}",
)

View File

@ -0,0 +1,152 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test lineage parser to get inlets and outlets information
"""
from datetime import datetime
from unittest import TestCase
from airflow import DAG
from airflow.operators.bash import BashOperator
from metadata.ingestion.source.pipeline.airflow.lineage_parser import (
INLETS_ATTR,
OUTLETS_ATTR,
XLets,
get_xlets_from_dag,
get_xlets_from_operator,
parse_xlets,
)
class TestAirflowLineageParser(TestCase):
"""
Handle airflow lineage parser validations
"""
def test_parse_xlets(self):
"""
Handle the shape validation of inlets and outlets, e.g.,
[{
"tables": ["A"],
"more_tables": ["X"]
}],
"""
raw_xlet = [{"tables": ["A"], "more_tables": ["X"]}]
self.assertEqual(parse_xlets(raw_xlet), {"tables": ["A"], "more_tables": ["X"]})
raw_xlet_without_list = [{"tables": ["A"], "more_tables": "random"}]
self.assertEqual(
parse_xlets(raw_xlet_without_list),
{
"tables": ["A"],
},
)
not_an_xlet_list = {"tables": ["A"]}
self.assertIsNone(parse_xlets(not_an_xlet_list))
def test_get_xlets_from_operator(self):
"""
Check how to get xlet data from operators
"""
operator = BashOperator(
task_id="print_date",
bash_command="date",
outlets={"tables": ["A"]},
)
# By default we try with inlets. There are none here
self.assertIsNone(get_xlets_from_operator(operator))
# But the outlets are parsed correctly
self.assertEqual(
get_xlets_from_operator(operator, xlet_mode=OUTLETS_ATTR), {"tables": ["A"]}
)
operator = BashOperator(
task_id="print_date",
bash_command="date",
inlets={"tables": ["A"], "more_tables": ["X"]},
)
self.assertEqual(
get_xlets_from_operator(operator, xlet_mode=INLETS_ATTR),
{"tables": ["A"], "more_tables": ["X"]},
)
self.assertIsNone(get_xlets_from_operator(operator, xlet_mode=OUTLETS_ATTR))
def test_get_xlets_from_dag(self):
"""
Check that we can properly join the xlet information from
all operators in the DAG
"""
with DAG("test_dag", start_date=datetime(2021, 1, 1)) as dag:
BashOperator(
task_id="print_date",
bash_command="date",
inlets={"tables": ["A"]},
)
BashOperator(
task_id="sleep",
bash_command="sleep 1",
outlets={"tables": ["B"]},
)
self.assertEqual(
get_xlets_from_dag(dag), [XLets(inlets={"A"}, outlets={"B"})]
)
with DAG("test_dag", start_date=datetime(2021, 1, 1)) as dag:
BashOperator(
task_id="print_date",
bash_command="date",
inlets={"tables": ["A", "A"], "this is a bit random": "foo"},
)
BashOperator(
task_id="sleep",
bash_command="sleep 1",
outlets={"tables": ["B"]},
)
self.assertEqual(
get_xlets_from_dag(dag), [XLets(inlets={"A"}, outlets={"B"})]
)
with DAG("test_dag", start_date=datetime(2021, 1, 1)) as dag:
BashOperator(
task_id="print_date",
bash_command="date",
inlets={
"tables": ["A", "A"],
"more_tables": ["X", "Y"],
"this is a bit random": "foo",
},
)
BashOperator(
task_id="sleep",
bash_command="sleep 1",
outlets={
"tables": ["B"],
"more_tables": ["Z"],
},
)
self.assertEqual(
get_xlets_from_dag(dag),
[
XLets(inlets={"A"}, outlets={"B"}),
XLets(inlets={"X", "Y"}, outlets={"Z"}),
],
)