mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-09-01 13:13:10 +00:00
* Add OMEntity model * Test OMEntity * Update repr * Fix __str__ * Add entity ref map * Test serializer for backend * Fix tests * Fix serializer * Test runner * Add runner tests * Update docs * Format
This commit is contained in:
parent
f57e429eb5
commit
7fcdf08ca4
@ -358,6 +358,13 @@ services:
|
||||
DB_SCHEME: ${AIRFLOW_DB_SCHEME:-mysql+pymysql}
|
||||
DB_USER: ${AIRFLOW_DB_USER:-airflow_user}
|
||||
DB_PASSWORD: ${AIRFLOW_DB_PASSWORD:-airflow_pass}
|
||||
|
||||
# To test the lineage backend
|
||||
# AIRFLOW__LINEAGE__BACKEND: airflow_provider_openmetadata.lineage.backend.OpenMetadataLineageBackend
|
||||
# AIRFLOW__LINEAGE__AIRFLOW_SERVICE_NAME: local_airflow
|
||||
# AIRFLOW__LINEAGE__OPENMETADATA_API_ENDPOINT: http://openmetadata-server:8585/api
|
||||
# AIRFLOW__LINEAGE__JWT_TOKEN: ...
|
||||
|
||||
entrypoint: /bin/bash
|
||||
command:
|
||||
- "/opt/airflow/ingestion_dependency.sh"
|
||||
|
@ -24,6 +24,10 @@ from datetime import timedelta
|
||||
from airflow.decorators import dag, task
|
||||
from airflow.utils.dates import days_ago
|
||||
|
||||
from metadata.generated.schema.entity.data.container import Container
|
||||
from metadata.generated.schema.entity.data.table import Table
|
||||
from metadata.ingestion.source.pipeline.airflow.lineage_parser import OMEntity
|
||||
|
||||
default_args = {
|
||||
"owner": "openmetadata_airflow_example",
|
||||
"depends_on_past": False,
|
||||
@ -45,7 +49,6 @@ def openmetadata_airflow_lineage_example():
|
||||
inlets={
|
||||
"tables": [
|
||||
"sample_data.ecommerce_db.shopify.raw_order",
|
||||
"sample_data.ecommerce_db.shopify.raw_customer",
|
||||
],
|
||||
},
|
||||
outlets={"tables": ["sample_data.ecommerce_db.shopify.fact_order"]},
|
||||
@ -53,7 +56,23 @@ def openmetadata_airflow_lineage_example():
|
||||
def generate_data():
|
||||
pass
|
||||
|
||||
@task(
|
||||
inlets=[
|
||||
OMEntity(entity=Container, fqn="s3_storage_sample.transactions", key="test")
|
||||
],
|
||||
outlets=[
|
||||
OMEntity(
|
||||
entity=Table,
|
||||
fqn="sample_data.ecommerce_db.shopify.raw_order",
|
||||
key="test",
|
||||
)
|
||||
],
|
||||
)
|
||||
def generate_data2():
|
||||
pass
|
||||
|
||||
generate_data()
|
||||
generate_data2()
|
||||
|
||||
|
||||
openmetadata_airflow_lineage_example_dag = openmetadata_airflow_lineage_example()
|
||||
|
@ -114,14 +114,17 @@ base_requirements = {
|
||||
"sqlalchemy>=1.4.0,<2",
|
||||
"collate-sqllineage>=1.0.4",
|
||||
"tabulate==0.9.0",
|
||||
"typing_extensions<=4.5.0", # We need to have this fixed due to a yanked release 4.6.0
|
||||
"typing_extensions>=4.8.0",
|
||||
"typing-inspect",
|
||||
"wheel~=0.38.4",
|
||||
}
|
||||
|
||||
|
||||
plugins: Dict[str, Set[str]] = {
|
||||
"airflow": {VERSIONS["airflow"]}, # Same as ingestion container. For development.
|
||||
"airflow": {
|
||||
VERSIONS["airflow"],
|
||||
"attrs",
|
||||
}, # Same as ingestion container. For development.
|
||||
"amundsen": {VERSIONS["neo4j"]},
|
||||
"athena": {"pyathena==3.0.8"},
|
||||
"atlas": {},
|
||||
|
@ -64,14 +64,17 @@ class OpenMetadataLineageBackend(LineageBackend):
|
||||
"""
|
||||
|
||||
try:
|
||||
dag = context["dag"]
|
||||
dag.log.info("Executing OpenMetadata Lineage Backend...")
|
||||
|
||||
config: AirflowLineageConfig = get_lineage_config()
|
||||
xlet_list: List[XLets] = get_xlets_from_dag(dag)
|
||||
metadata = OpenMetadata(config.metadata_config)
|
||||
xlet_list: List[XLets] = get_xlets_from_dag(context["dag"])
|
||||
|
||||
runner = AirflowLineageRunner(
|
||||
metadata=metadata,
|
||||
service_name=config.airflow_service_name,
|
||||
dag=context["dag"],
|
||||
dag=dag,
|
||||
xlets=xlet_list,
|
||||
only_keep_dag_lineage=config.only_keep_dag_lineage,
|
||||
max_status=config.max_status,
|
||||
|
@ -47,6 +47,7 @@ from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDe
|
||||
from metadata.generated.schema.type.entityReference import EntityReference
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
from metadata.ingestion.source.pipeline.airflow.lineage_parser import XLets
|
||||
from metadata.utils.constants import ENTITY_REFERENCE_TYPE_MAP
|
||||
from metadata.utils.helpers import clean_uri, datetime_to_ts
|
||||
|
||||
|
||||
@ -251,37 +252,47 @@ class AirflowLineageRunner:
|
||||
"""
|
||||
|
||||
lineage_details = LineageDetails(
|
||||
pipeline=EntityReference(id=pipeline.id, type="pipeline")
|
||||
pipeline=EntityReference(
|
||||
id=pipeline.id, type=ENTITY_REFERENCE_TYPE_MAP[Pipeline.__name__]
|
||||
)
|
||||
)
|
||||
|
||||
for from_fqn in xlets.inlets or []:
|
||||
for from_xlet in xlets.inlets or []:
|
||||
from_entity: Optional[Table] = self.metadata.get_by_name(
|
||||
entity=Table, fqn=from_fqn
|
||||
entity=from_xlet.entity, fqn=from_xlet.fqn
|
||||
)
|
||||
if from_entity:
|
||||
for to_fqn in xlets.outlets or []:
|
||||
for to_xlet in xlets.outlets or []:
|
||||
to_entity: Optional[Table] = self.metadata.get_by_name(
|
||||
entity=Table, fqn=to_fqn
|
||||
entity=to_xlet.entity, fqn=to_xlet.fqn
|
||||
)
|
||||
if to_entity:
|
||||
lineage = AddLineageRequest(
|
||||
edge=EntitiesEdge(
|
||||
fromEntity=EntityReference(
|
||||
id=from_entity.id, type="table"
|
||||
id=from_entity.id,
|
||||
type=ENTITY_REFERENCE_TYPE_MAP[
|
||||
from_xlet.entity.__name__
|
||||
],
|
||||
),
|
||||
toEntity=EntityReference(
|
||||
id=to_entity.id,
|
||||
type=ENTITY_REFERENCE_TYPE_MAP[
|
||||
to_xlet.entity.__name__
|
||||
],
|
||||
),
|
||||
toEntity=EntityReference(id=to_entity.id, type="table"),
|
||||
lineageDetails=lineage_details,
|
||||
)
|
||||
)
|
||||
self.metadata.add_lineage(lineage)
|
||||
else:
|
||||
self.dag.log.warning(
|
||||
f"Could not find Table [{to_fqn}] from "
|
||||
f"Could not find [{to_xlet.entity.__name__}] [{to_xlet.fqn}] from "
|
||||
f"[{pipeline.fullyQualifiedName.__root__}] outlets"
|
||||
)
|
||||
else:
|
||||
self.dag.log.warning(
|
||||
f"Could not find Table [{from_fqn}] from "
|
||||
f"Could not find [{from_xlet.entity.__name__}] [{from_xlet.fqn}] from "
|
||||
f"[{pipeline.fullyQualifiedName.__root__}] inlets"
|
||||
)
|
||||
|
||||
@ -305,7 +316,8 @@ class AirflowLineageRunner:
|
||||
for node in lineage_data.get("nodes") or []
|
||||
if node["id"] == upstream_edge["fromEntity"]
|
||||
and node["type"] == "table"
|
||||
)
|
||||
),
|
||||
None,
|
||||
)
|
||||
for upstream_edge in lineage_data.get("upstreamEdges") or []
|
||||
]
|
||||
@ -316,26 +328,37 @@ class AirflowLineageRunner:
|
||||
for node in lineage_data.get("nodes") or []
|
||||
if node["id"] == downstream_edge["toEntity"]
|
||||
and node["type"] == "table"
|
||||
)
|
||||
),
|
||||
None,
|
||||
)
|
||||
for downstream_edge in lineage_data.get("downstreamEdges") or []
|
||||
]
|
||||
|
||||
for edge in upstream_edges:
|
||||
if edge.fqn not in xlets.inlets:
|
||||
for edge in upstream_edges or []:
|
||||
if edge.fqn not in (inlet.fqn for inlet in xlets.inlets):
|
||||
self.dag.log.info(f"Removing upstream edge with {edge.fqn}")
|
||||
edge_to_remove = EntitiesEdge(
|
||||
fromEntity=EntityReference(id=edge.id, type="table"),
|
||||
toEntity=EntityReference(id=pipeline.id, type="pipeline"),
|
||||
fromEntity=EntityReference(
|
||||
id=edge.id, type=ENTITY_REFERENCE_TYPE_MAP[Table.__name__]
|
||||
),
|
||||
toEntity=EntityReference(
|
||||
id=pipeline.id,
|
||||
type=ENTITY_REFERENCE_TYPE_MAP[Pipeline.__name__],
|
||||
),
|
||||
)
|
||||
self.metadata.delete_lineage_edge(edge=edge_to_remove)
|
||||
|
||||
for edge in downstream_edges:
|
||||
if edge.fqn not in xlets.outlets:
|
||||
for edge in downstream_edges or []:
|
||||
if edge.fqn not in (outlet.fqn for outlet in xlets.outlets):
|
||||
self.dag.log.info(f"Removing downstream edge with {edge.fqn}")
|
||||
edge_to_remove = EntitiesEdge(
|
||||
fromEntity=EntityReference(id=pipeline.id, type="pipeline"),
|
||||
toEntity=EntityReference(id=edge.id, type="table"),
|
||||
fromEntity=EntityReference(
|
||||
id=pipeline.id,
|
||||
type=ENTITY_REFERENCE_TYPE_MAP[Pipeline.__name__],
|
||||
),
|
||||
toEntity=EntityReference(
|
||||
id=edge.id, type=ENTITY_REFERENCE_TYPE_MAP[Table.__name__]
|
||||
),
|
||||
)
|
||||
self.metadata.delete_lineage_edge(edge=edge_to_remove)
|
||||
|
||||
|
@ -62,13 +62,23 @@ we'll join the keys and get [
|
||||
]
|
||||
and we'll treat this as independent sets of lineage
|
||||
"""
|
||||
import json
|
||||
import logging
|
||||
import traceback
|
||||
from collections import defaultdict
|
||||
from copy import deepcopy
|
||||
from enum import Enum
|
||||
from typing import Dict, List, Optional, Set
|
||||
from functools import singledispatch
|
||||
from typing import Any, DefaultDict, Dict, List, Optional, Type
|
||||
|
||||
import attr
|
||||
from pydantic import BaseModel
|
||||
|
||||
from metadata.generated.schema.entity.data.table import Table
|
||||
from metadata.ingestion.ometa.models import T
|
||||
from metadata.utils.deprecation import deprecated
|
||||
from metadata.utils.importer import import_from_module
|
||||
|
||||
logger = logging.getLogger("airflow.task")
|
||||
|
||||
|
||||
@ -85,41 +95,207 @@ class XLetsAttr(Enum):
|
||||
PRIVATE_OUTLETS = "_outlets"
|
||||
|
||||
|
||||
@attr.s(auto_attribs=True, kw_only=True)
|
||||
class OMEntity:
|
||||
"""
|
||||
Identifies one entity in OpenMetadata.
|
||||
We use attr annotated object similar to https://github.com/apache/airflow/blob/main/airflow/lineage/entities.py
|
||||
based on https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/lineage.html
|
||||
"""
|
||||
|
||||
# Entity Type, such as Table, Container or Dashboard.
|
||||
entity: Type[T] = attr.ib()
|
||||
# Entity Fully Qualified Name, e.g., service.database.schema.table
|
||||
fqn: str = attr.ib()
|
||||
# We will use the key in case we need to group different lineages from the same DAG
|
||||
key: str = "default"
|
||||
|
||||
def __str__(self):
|
||||
"""Custom serialization"""
|
||||
_dict = deepcopy(self.__dict__)
|
||||
_dict["entity"] = f"{self.entity.__module__}.{self.entity.__name__}"
|
||||
return json.dumps(_dict)
|
||||
|
||||
def serialize(self) -> str:
|
||||
"""Custom serialization to be called in airflow internals"""
|
||||
return str(self)
|
||||
|
||||
|
||||
class XLets(BaseModel):
|
||||
"""
|
||||
Group inlets and outlets from all tasks in a DAG
|
||||
"""
|
||||
|
||||
inlets: Set[str]
|
||||
outlets: Set[str]
|
||||
inlets: List[OMEntity]
|
||||
outlets: List[OMEntity]
|
||||
|
||||
class Config:
|
||||
arbitrary_types_allowed = True
|
||||
|
||||
|
||||
def parse_xlets(xlet: List[dict]) -> Optional[Dict[str, List[str]]]:
|
||||
def concat_dict_values(
|
||||
d1: DefaultDict[str, List[Any]], d2: Optional[Dict[str, List[Any]]]
|
||||
) -> DefaultDict[str, List[Any]]:
|
||||
"""
|
||||
Update d1 based on d2 values concatenating their results.
|
||||
"""
|
||||
if d2:
|
||||
for key, value in d2.items():
|
||||
d1[key] = d1[key] + value
|
||||
|
||||
return d1
|
||||
|
||||
|
||||
def parse_xlets(xlet: List[Any]) -> Optional[Dict[str, List[OMEntity]]]:
|
||||
"""
|
||||
Parse airflow xlets for V1
|
||||
:param xlet: airflow v2 xlet dict
|
||||
:return: dictionary of xlet list or None
|
||||
|
||||
[{'__var': {'tables': ['sample_data.ecommerce_db.shopify.fact_order']},
|
||||
'__type': 'dict'}]
|
||||
If our operators are like
|
||||
```
|
||||
BashOperator(
|
||||
task_id="print_date",
|
||||
bash_command="date",
|
||||
inlets={"tables": ["A"]},
|
||||
)
|
||||
```
|
||||
the inlets/outlets will still be processed in airflow as a `List`.
|
||||
|
||||
Note that when picking them up from Serialized DAGs, the shape is:
|
||||
```
|
||||
[{'__var': {'tables': ['sample_data.ecommerce_db.shopify.fact_order']}, '__type': 'dict'}]
|
||||
```
|
||||
|
||||
If using Datasets, we get something like:
|
||||
```
|
||||
[Dataset(uri='s3://dataset-bucket/input.csv', extra=None)]
|
||||
```
|
||||
We need to figure out how we want to handle information coming in this format.
|
||||
"""
|
||||
# This branch is for lineage parser op
|
||||
if isinstance(xlet, list) and len(xlet) and isinstance(xlet[0], dict):
|
||||
xlet_dict = xlet[0]
|
||||
# This is how the Serialized DAG is giving us the info from _inlets & _outlets
|
||||
if isinstance(xlet_dict, dict) and xlet_dict.get("__var"):
|
||||
xlet_dict = xlet_dict["__var"]
|
||||
return {
|
||||
key: value for key, value in xlet_dict.items() if isinstance(value, list)
|
||||
}
|
||||
if isinstance(xlet, list) and len(xlet):
|
||||
_parsed_xlets = defaultdict(list)
|
||||
for element in xlet:
|
||||
parsed_element = _parse_xlets(element) or {}
|
||||
|
||||
# Update our xlet dict based on each parsed element
|
||||
# Since we can get a list of elements, concatenate the results from multiple xlets
|
||||
_parsed_xlets = concat_dict_values(_parsed_xlets, parsed_element)
|
||||
|
||||
return _parsed_xlets
|
||||
|
||||
return None
|
||||
|
||||
|
||||
@singledispatch
|
||||
def _parse_xlets(xlet: Any) -> None:
|
||||
"""
|
||||
Default behavior to handle lineage.
|
||||
|
||||
We can use this function to register further inlets/outlets
|
||||
representations, e.g., https://github.com/open-metadata/OpenMetadata/issues/11626
|
||||
"""
|
||||
logger.warning(f"Inlet/Outlet type {type(xlet)} is not supported.")
|
||||
|
||||
|
||||
@_parse_xlets.register
|
||||
@deprecated(
|
||||
message="Please update your inlets/outlets to follow <TODO DOCS>",
|
||||
release="1.4.0",
|
||||
)
|
||||
def dictionary_lineage_annotation(xlet: dict) -> Dict[str, List[OMEntity]]:
|
||||
"""
|
||||
Handle OM specific inlet/outlet information. E.g.,
|
||||
|
||||
```
|
||||
BashOperator(
|
||||
task_id="print_date",
|
||||
bash_command="date",
|
||||
inlets={
|
||||
"tables": ["A", "A"],
|
||||
"more_tables": ["X", "Y"],
|
||||
"this is a bit random": "foo",
|
||||
},
|
||||
)
|
||||
```
|
||||
"""
|
||||
xlet_dict = xlet
|
||||
# This is how the Serialized DAG is giving us the info from _inlets & _outlets
|
||||
if isinstance(xlet_dict, dict) and xlet_dict.get("__var"):
|
||||
xlet_dict = xlet_dict["__var"]
|
||||
|
||||
return {
|
||||
key: [
|
||||
# We will convert the old dict lineage method into Tables
|
||||
OMEntity(entity=Table, fqn=fqn)
|
||||
for fqn in set(value) # Remove duplicates
|
||||
]
|
||||
for key, value in xlet_dict.items()
|
||||
if isinstance(value, list)
|
||||
}
|
||||
|
||||
|
||||
@_parse_xlets.register
|
||||
def _(xlet: OMEntity) -> Optional[Dict[str, List[OMEntity]]]:
|
||||
"""
|
||||
Handle OM specific inlet/outlet information. E.g.,
|
||||
|
||||
```
|
||||
BashOperator(
|
||||
task_id="sleep",
|
||||
bash_command=SLEEP,
|
||||
outlets=[OMEntity(entity=Table, fqn="B")],
|
||||
)
|
||||
```
|
||||
"""
|
||||
return {xlet.key: [xlet]}
|
||||
|
||||
|
||||
@_parse_xlets.register
|
||||
def _(xlet: str) -> Optional[Dict[str, List[OMEntity]]]:
|
||||
"""
|
||||
Handle OM specific inlet/outlet information. E.g.,
|
||||
|
||||
```
|
||||
BashOperator(
|
||||
task_id="sleep",
|
||||
bash_command=SLEEP,
|
||||
outlets=[OMEntity(entity=Table, fqn="B")],
|
||||
)
|
||||
```
|
||||
|
||||
Once a DAG is serialized, the xlet info will be stored as:
|
||||
```
|
||||
['{"entity": "metadata.generated.schema.entity.data.table.Table", "fqn": "FQN", "key": "test"}']
|
||||
```
|
||||
based on our custom serialization logic.
|
||||
|
||||
In this method, we need to revert this back to the actual instance of OMEntity.
|
||||
Note that we need to properly validate that the string is following the constraints of:
|
||||
- Being a JSON representation
|
||||
- Following the structure of an OMEntity
|
||||
|
||||
Otherwise, we could be having any other attr-based xlet native from Airflow.
|
||||
"""
|
||||
try:
|
||||
body = json.loads(xlet)
|
||||
om_entity = OMEntity(
|
||||
entity=import_from_module(body.get("entity")),
|
||||
fqn=body.get("fqn"),
|
||||
key=body.get("key"),
|
||||
)
|
||||
|
||||
return {om_entity.key: [om_entity]}
|
||||
except Exception as exc:
|
||||
logger.error(
|
||||
f"We could not parse the inlet/outlet information from [{xlet}] due to [{exc}]"
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
def get_xlets_from_operator(
|
||||
operator: "BaseOperator", xlet_mode: XLetsMode
|
||||
) -> Optional[Dict[str, List[str]]]:
|
||||
) -> Optional[Dict[str, List[OMEntity]]]:
|
||||
"""
|
||||
Given an Airflow DAG Task, obtain the tables
|
||||
set in inlets or outlets.
|
||||
@ -166,25 +342,26 @@ def get_xlets_from_dag(dag: "DAG") -> List[XLets]:
|
||||
Fill the inlets and outlets of the Pipeline by iterating
|
||||
over all its tasks
|
||||
"""
|
||||
_inlets = {}
|
||||
_outlets = {}
|
||||
_inlets = defaultdict(list)
|
||||
_outlets = defaultdict(list)
|
||||
|
||||
# First, grab all the inlets and outlets from all tasks grouped by keys
|
||||
for task in dag.tasks:
|
||||
try:
|
||||
_inlets.update(
|
||||
_inlets = concat_dict_values(
|
||||
_inlets,
|
||||
get_xlets_from_operator(
|
||||
operator=task,
|
||||
xlet_mode=XLetsMode.INLETS,
|
||||
)
|
||||
or []
|
||||
),
|
||||
)
|
||||
_outlets.update(
|
||||
|
||||
_outlets = concat_dict_values(
|
||||
_outlets,
|
||||
get_xlets_from_operator(
|
||||
operator=task,
|
||||
xlet_mode=XLetsMode.OUTLETS,
|
||||
)
|
||||
or []
|
||||
),
|
||||
)
|
||||
|
||||
except Exception as exc:
|
||||
@ -197,7 +374,7 @@ def get_xlets_from_dag(dag: "DAG") -> List[XLets]:
|
||||
# We expect to have the same keys in both inlets and outlets dicts
|
||||
# We will then iterate over the inlet keys to build the list of XLets
|
||||
return [
|
||||
XLets(inlets=set(value), outlets=set(_outlets[key]))
|
||||
XLets(inlets=value, outlets=_outlets[key])
|
||||
for key, value in _inlets.items()
|
||||
if value and _outlets.get(key)
|
||||
]
|
||||
|
@ -30,7 +30,6 @@ from metadata.generated.schema.entity.data.pipeline import (
|
||||
Task,
|
||||
TaskStatus,
|
||||
)
|
||||
from metadata.generated.schema.entity.data.table import Table
|
||||
from metadata.generated.schema.entity.services.connections.pipeline.airflowConnection import (
|
||||
AirflowConnection,
|
||||
)
|
||||
@ -45,7 +44,10 @@ from metadata.ingestion.api.steps import InvalidSourceException
|
||||
from metadata.ingestion.connections.session import create_and_bind_session
|
||||
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
from metadata.ingestion.source.pipeline.airflow.lineage_parser import get_xlets_from_dag
|
||||
from metadata.ingestion.source.pipeline.airflow.lineage_parser import (
|
||||
XLets,
|
||||
get_xlets_from_dag,
|
||||
)
|
||||
from metadata.ingestion.source.pipeline.airflow.models import (
|
||||
AirflowDag,
|
||||
AirflowDagDetails,
|
||||
@ -53,6 +55,7 @@ from metadata.ingestion.source.pipeline.airflow.models import (
|
||||
from metadata.ingestion.source.pipeline.airflow.utils import get_schedule_interval
|
||||
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
|
||||
from metadata.utils import fqn
|
||||
from metadata.utils.constants import ENTITY_REFERENCE_TYPE_MAP
|
||||
from metadata.utils.helpers import clean_uri, datetime_to_ts
|
||||
from metadata.utils.logger import ingestion_logger
|
||||
|
||||
@ -264,7 +267,7 @@ class AirflowSource(PipelineServiceSource):
|
||||
SerializedDagModel.dag_id,
|
||||
json_data_column,
|
||||
SerializedDagModel.fileloc,
|
||||
).all():
|
||||
).yield_per(100):
|
||||
try:
|
||||
data = serialized_dag[1]["dag"]
|
||||
dag = AirflowDagDetails(
|
||||
@ -429,25 +432,40 @@ class AirflowSource(PipelineServiceSource):
|
||||
return
|
||||
|
||||
lineage_details = LineageDetails(
|
||||
pipeline=EntityReference(id=pipeline_entity.id.__root__, type="pipeline"),
|
||||
pipeline=EntityReference(
|
||||
id=pipeline_entity.id.__root__,
|
||||
type=ENTITY_REFERENCE_TYPE_MAP[Pipeline.__name__],
|
||||
),
|
||||
source=LineageSource.PipelineLineage,
|
||||
)
|
||||
|
||||
xlets = get_xlets_from_dag(dag=pipeline_details) if pipeline_details else []
|
||||
xlets: List[XLets] = (
|
||||
get_xlets_from_dag(dag=pipeline_details) if pipeline_details else []
|
||||
)
|
||||
for xlet in xlets:
|
||||
for from_fqn in xlet.inlets or []:
|
||||
from_entity = self.metadata.get_by_name(entity=Table, fqn=from_fqn)
|
||||
for from_xlet in xlet.inlets or []:
|
||||
from_entity = self.metadata.get_by_name(
|
||||
entity=from_xlet.entity, fqn=from_xlet.fqn
|
||||
)
|
||||
if from_entity:
|
||||
for to_fqn in xlet.outlets or []:
|
||||
to_entity = self.metadata.get_by_name(entity=Table, fqn=to_fqn)
|
||||
for to_xlet in xlet.outlets or []:
|
||||
to_entity = self.metadata.get_by_name(
|
||||
entity=to_xlet.entity, fqn=to_xlet.fqn
|
||||
)
|
||||
if to_entity:
|
||||
lineage = AddLineageRequest(
|
||||
edge=EntitiesEdge(
|
||||
fromEntity=EntityReference(
|
||||
id=from_entity.id, type="table"
|
||||
id=from_entity.id,
|
||||
type=ENTITY_REFERENCE_TYPE_MAP[
|
||||
from_xlet.entity.__name__
|
||||
],
|
||||
),
|
||||
toEntity=EntityReference(
|
||||
id=to_entity.id, type="table"
|
||||
id=to_entity.id,
|
||||
type=ENTITY_REFERENCE_TYPE_MAP[
|
||||
to_xlet.entity.__name__
|
||||
],
|
||||
),
|
||||
lineageDetails=lineage_details,
|
||||
)
|
||||
@ -455,12 +473,12 @@ class AirflowSource(PipelineServiceSource):
|
||||
yield Either(right=lineage)
|
||||
else:
|
||||
logger.warning(
|
||||
f"Could not find Table [{to_fqn}] from "
|
||||
f"Could not find [{to_xlet.entity.__name__}] [{to_xlet.fqn}] from "
|
||||
f"[{pipeline_entity.fullyQualifiedName.__root__}] outlets"
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
f"Could not find Table [{from_fqn}] from "
|
||||
f"Could not find [{from_xlet.entity.__name__}] [{from_xlet.fqn}] from "
|
||||
f"[{pipeline_entity.fullyQualifiedName.__root__}] inlets"
|
||||
)
|
||||
|
||||
|
@ -12,6 +12,26 @@
|
||||
"""
|
||||
Define constants useful for the metadata ingestion
|
||||
"""
|
||||
from metadata.generated.schema.entity.data.chart import Chart
|
||||
from metadata.generated.schema.entity.data.container import Container
|
||||
from metadata.generated.schema.entity.data.dashboard import Dashboard
|
||||
from metadata.generated.schema.entity.data.dashboardDataModel import DashboardDataModel
|
||||
from metadata.generated.schema.entity.data.database import Database
|
||||
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
|
||||
from metadata.generated.schema.entity.data.mlmodel import MlModel
|
||||
from metadata.generated.schema.entity.data.pipeline import Pipeline
|
||||
from metadata.generated.schema.entity.data.searchIndex import SearchIndex
|
||||
from metadata.generated.schema.entity.data.storedProcedure import StoredProcedure
|
||||
from metadata.generated.schema.entity.data.table import Table
|
||||
from metadata.generated.schema.entity.data.topic import Topic
|
||||
from metadata.generated.schema.entity.services.dashboardService import DashboardService
|
||||
from metadata.generated.schema.entity.services.databaseService import DatabaseService
|
||||
from metadata.generated.schema.entity.services.messagingService import MessagingService
|
||||
from metadata.generated.schema.entity.services.metadataService import MetadataService
|
||||
from metadata.generated.schema.entity.services.mlmodelService import MlModelService
|
||||
from metadata.generated.schema.entity.services.pipelineService import PipelineService
|
||||
from metadata.generated.schema.entity.services.searchService import SearchService
|
||||
from metadata.generated.schema.entity.services.storageService import StorageService
|
||||
|
||||
DOT = "_DOT_"
|
||||
TEN_MIN = 10 * 60
|
||||
@ -47,3 +67,29 @@ AUTHORIZATION_HEADER = "Authorization"
|
||||
NO_ACCESS_TOKEN = "no_token"
|
||||
|
||||
SAMPLE_DATA_DEFAULT_COUNT = 50
|
||||
|
||||
# Mainly used for lineage
|
||||
ENTITY_REFERENCE_TYPE_MAP = {
|
||||
# Service Entities
|
||||
DatabaseService.__name__: "databaseService",
|
||||
MessagingService.__name__: "messagingService",
|
||||
DashboardService.__name__: "dashboardService",
|
||||
PipelineService.__name__: "pipelineService",
|
||||
StorageService.__name__: "storageService",
|
||||
MlModelService.__name__: "mlmodelService",
|
||||
MetadataService.__name__: "metadataService",
|
||||
SearchService.__name__: "searchService",
|
||||
# Data Asset Entities
|
||||
Table.__name__: "table",
|
||||
StoredProcedure.__name__: "storedProcedure",
|
||||
Database.__name__: "database",
|
||||
DatabaseSchema.__name__: "databaseSchema",
|
||||
Dashboard.__name__: "dashboard",
|
||||
DashboardDataModel.__name__: "dashboardDataModel",
|
||||
Pipeline.__name__: "pipeline",
|
||||
Chart.__name__: "chart",
|
||||
Topic.__name__: "topic",
|
||||
SearchIndex.__name__: "searchIndex",
|
||||
MlModel.__name__: "mlmodel",
|
||||
Container.__name__: "container",
|
||||
}
|
||||
|
@ -187,7 +187,9 @@ def log_ansi_encoded_string(
|
||||
@singledispatch
|
||||
def get_log_name(record: Entity) -> Optional[str]:
|
||||
try:
|
||||
return f"{type(record).__name__} [{getattr(record, 'name', record.entity.name).__root__}]"
|
||||
if hasattr(record, "name"):
|
||||
return f"{type(record).__name__} [{getattr(record, 'name').__root__}]"
|
||||
return f"{type(record).__name__} [{record.entity.name.__root__}]"
|
||||
except Exception:
|
||||
return str(record)
|
||||
|
||||
|
0
ingestion/tests/integration/airflow/__init__.py
Normal file
0
ingestion/tests/integration/airflow/__init__.py
Normal file
218
ingestion/tests/integration/airflow/test_lineage_runner.py
Normal file
218
ingestion/tests/integration/airflow/test_lineage_runner.py
Normal file
@ -0,0 +1,218 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
Test lineage parser to get inlets and outlets information
|
||||
"""
|
||||
from datetime import datetime
|
||||
from unittest import TestCase
|
||||
from unittest.mock import patch
|
||||
|
||||
from airflow import DAG
|
||||
from airflow.operators.bash import BashOperator
|
||||
|
||||
from airflow_provider_openmetadata.lineage.runner import AirflowLineageRunner
|
||||
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
|
||||
from metadata.generated.schema.api.data.createDatabaseSchema import (
|
||||
CreateDatabaseSchemaRequest,
|
||||
)
|
||||
from metadata.generated.schema.api.data.createTable import CreateTableRequest
|
||||
from metadata.generated.schema.api.services.createDatabaseService import (
|
||||
CreateDatabaseServiceRequest,
|
||||
)
|
||||
from metadata.generated.schema.entity.data.table import Column, DataType, Table
|
||||
from metadata.generated.schema.entity.services.connections.database.common.basicAuth import (
|
||||
BasicAuth,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.connections.database.mysqlConnection import (
|
||||
MysqlConnection,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
||||
OpenMetadataConnection,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.databaseService import (
|
||||
DatabaseConnection,
|
||||
DatabaseService,
|
||||
DatabaseServiceType,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.pipelineService import PipelineService
|
||||
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
|
||||
OpenMetadataJWTClientConfig,
|
||||
)
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
from metadata.ingestion.source.pipeline.airflow.lineage_parser import (
|
||||
OMEntity,
|
||||
get_xlets_from_dag,
|
||||
)
|
||||
|
||||
SLEEP = "sleep 1"
|
||||
PIPELINE_SERVICE_NAME = "test-lineage-runner"
|
||||
DB_SERVICE_NAME = "test-service-lineage-runner"
|
||||
OM_JWT = "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
|
||||
|
||||
|
||||
class TestAirflowLineageRuner(TestCase):
|
||||
"""
|
||||
Validate AirflowLineageRunner
|
||||
"""
|
||||
|
||||
server_config = OpenMetadataConnection(
|
||||
hostPort="http://localhost:8585/api",
|
||||
authProvider="openmetadata",
|
||||
securityConfig=OpenMetadataJWTClientConfig(jwtToken=OM_JWT),
|
||||
)
|
||||
metadata = OpenMetadata(server_config)
|
||||
|
||||
assert metadata.health_check()
|
||||
|
||||
service = CreateDatabaseServiceRequest(
|
||||
name=DB_SERVICE_NAME,
|
||||
serviceType=DatabaseServiceType.Mysql,
|
||||
connection=DatabaseConnection(
|
||||
config=MysqlConnection(
|
||||
username="username",
|
||||
authType=BasicAuth(password="password"),
|
||||
hostPort="http://localhost:1234",
|
||||
)
|
||||
),
|
||||
)
|
||||
service_type = "databaseService"
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls) -> None:
|
||||
"""
|
||||
Prepare ingredients: Table Entity + DAG
|
||||
"""
|
||||
|
||||
service_entity = cls.metadata.create_or_update(data=cls.service)
|
||||
|
||||
create_db = CreateDatabaseRequest(
|
||||
name="test-db",
|
||||
service=service_entity.fullyQualifiedName,
|
||||
)
|
||||
|
||||
create_db_entity = cls.metadata.create_or_update(data=create_db)
|
||||
|
||||
create_schema = CreateDatabaseSchemaRequest(
|
||||
name="test-schema",
|
||||
database=create_db_entity.fullyQualifiedName,
|
||||
)
|
||||
|
||||
create_schema_entity = cls.metadata.create_or_update(data=create_schema)
|
||||
|
||||
create_inlet = CreateTableRequest(
|
||||
name="lineage-test-inlet",
|
||||
databaseSchema=create_schema_entity.fullyQualifiedName,
|
||||
columns=[Column(name="id", dataType=DataType.BIGINT)],
|
||||
)
|
||||
|
||||
create_inlet_2 = CreateTableRequest(
|
||||
name="lineage-test-inlet2",
|
||||
databaseSchema=create_schema_entity.fullyQualifiedName,
|
||||
columns=[Column(name="id", dataType=DataType.BIGINT)],
|
||||
)
|
||||
|
||||
create_outlet = CreateTableRequest(
|
||||
name="lineage-test-outlet",
|
||||
databaseSchema=create_schema_entity.fullyQualifiedName,
|
||||
columns=[Column(name="id", dataType=DataType.BIGINT)],
|
||||
)
|
||||
|
||||
cls.table_inlet1: Table = cls.metadata.create_or_update(data=create_inlet)
|
||||
cls.table_inlet2: Table = cls.metadata.create_or_update(data=create_inlet_2)
|
||||
cls.table_outlet: Table = cls.metadata.create_or_update(data=create_outlet)
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls) -> None:
|
||||
"""
|
||||
Clean up
|
||||
"""
|
||||
|
||||
service_id = str(
|
||||
cls.metadata.get_by_name(
|
||||
entity=DatabaseService, fqn=DB_SERVICE_NAME
|
||||
).id.__root__
|
||||
)
|
||||
|
||||
cls.metadata.delete(
|
||||
entity=DatabaseService,
|
||||
entity_id=service_id,
|
||||
recursive=True,
|
||||
hard_delete=True,
|
||||
)
|
||||
|
||||
# Service ID created from the Airflow Lineage Operator in the
|
||||
# example DAG
|
||||
pipeline_service_id = str(
|
||||
cls.metadata.get_by_name(
|
||||
entity=PipelineService, fqn=PIPELINE_SERVICE_NAME
|
||||
).id.__root__
|
||||
)
|
||||
|
||||
cls.metadata.delete(
|
||||
entity=PipelineService,
|
||||
entity_id=pipeline_service_id,
|
||||
recursive=True,
|
||||
hard_delete=True,
|
||||
)
|
||||
|
||||
def test_lineage_runner(self):
|
||||
|
||||
with DAG("test_runner", start_date=datetime(2021, 1, 1)) as dag:
|
||||
BashOperator(
|
||||
task_id="print_date",
|
||||
bash_command="date",
|
||||
inlets=[
|
||||
OMEntity(
|
||||
entity=Table,
|
||||
fqn="test-service-lineage-runner.test-db.test-schema.lineage-test-inlet",
|
||||
),
|
||||
OMEntity(
|
||||
entity=Table,
|
||||
fqn="test-service-lineage-runner.test-db.test-schema.lineage-test-inlet2",
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
BashOperator(
|
||||
task_id="sleep",
|
||||
bash_command=SLEEP,
|
||||
outlets=[
|
||||
OMEntity(
|
||||
entity=Table,
|
||||
fqn="test-service-lineage-runner.test-db.test-schema.lineage-test-outlet",
|
||||
)
|
||||
],
|
||||
)
|
||||
|
||||
# skip the statuses since they require getting data from airflow's db
|
||||
with patch.object(
|
||||
AirflowLineageRunner, "add_all_pipeline_status", return_value=None
|
||||
):
|
||||
runner = AirflowLineageRunner(
|
||||
metadata=self.metadata,
|
||||
service_name=PIPELINE_SERVICE_NAME,
|
||||
dag=dag,
|
||||
xlets=get_xlets_from_dag(dag),
|
||||
only_keep_dag_lineage=True,
|
||||
)
|
||||
|
||||
runner.execute()
|
||||
|
||||
lineage_data = self.metadata.get_lineage_by_name(
|
||||
entity=Table,
|
||||
fqn=self.table_outlet.fullyQualifiedName.__root__,
|
||||
up_depth=1,
|
||||
down_depth=1,
|
||||
)
|
||||
|
||||
upstream_ids = [edge["fromEntity"] for edge in lineage_data["upstreamEdges"]]
|
||||
self.assertIn(str(self.table_inlet1.id.__root__), upstream_ids)
|
||||
self.assertIn(str(self.table_inlet2.id.__root__), upstream_ids)
|
@ -12,25 +12,68 @@
|
||||
Test lineage parser to get inlets and outlets information
|
||||
"""
|
||||
from datetime import datetime
|
||||
from typing import List, Set
|
||||
from unittest import TestCase
|
||||
|
||||
from airflow import DAG
|
||||
from airflow.operators.bash import BashOperator
|
||||
from airflow.serialization.serde import serialize
|
||||
|
||||
from metadata.generated.schema.entity.data.container import Container
|
||||
from metadata.generated.schema.entity.data.dashboard import Dashboard
|
||||
from metadata.generated.schema.entity.data.table import Table
|
||||
from metadata.ingestion.source.pipeline.airflow.lineage_parser import (
|
||||
OMEntity,
|
||||
XLets,
|
||||
XLetsMode,
|
||||
_parse_xlets,
|
||||
get_xlets_from_dag,
|
||||
get_xlets_from_operator,
|
||||
parse_xlets,
|
||||
)
|
||||
|
||||
SLEEP = "sleep 1"
|
||||
|
||||
|
||||
def xlet_fqns(xlet: XLets, xlet_mode: XLetsMode) -> Set[str]:
|
||||
"""Helper method to get a set of FQNs out of the xlet"""
|
||||
return set(elem.fqn for elem in getattr(xlet, xlet_mode.value))
|
||||
|
||||
|
||||
class TestAirflowLineageParser(TestCase):
|
||||
"""
|
||||
Handle airflow lineage parser validations
|
||||
"""
|
||||
|
||||
def assertXLetsEquals(self, first: List[XLets], second: List[XLets]):
|
||||
"""
|
||||
Check that both XLet lists are the same
|
||||
|
||||
Even if they are lists, we don't care about the order.
|
||||
|
||||
Note that we cannot use sets since `OMEntity` is not hashable.
|
||||
|
||||
For this test, we will assume that by having the same FQN, the
|
||||
entity type will also be the same.
|
||||
"""
|
||||
self.assertEquals(len(first), len(second))
|
||||
|
||||
for xlet1 in first:
|
||||
match = False
|
||||
|
||||
first_inlets = xlet_fqns(xlet1, XLetsMode.INLETS)
|
||||
first_outlets = xlet_fqns(xlet1, XLetsMode.OUTLETS)
|
||||
|
||||
for xlet2 in second:
|
||||
second_inlets = xlet_fqns(xlet2, XLetsMode.INLETS)
|
||||
second_outlets = xlet_fqns(xlet2, XLetsMode.OUTLETS)
|
||||
|
||||
if first_inlets == second_inlets and first_outlets == second_outlets:
|
||||
match = True
|
||||
break
|
||||
|
||||
self.assertTrue(match)
|
||||
|
||||
def test_parse_xlets(self):
|
||||
"""
|
||||
Handle the shape validation of inlets and outlets, e.g.,
|
||||
@ -40,13 +83,19 @@ class TestAirflowLineageParser(TestCase):
|
||||
}],
|
||||
"""
|
||||
raw_xlet = [{"tables": ["A"], "more_tables": ["X"]}]
|
||||
self.assertEqual(parse_xlets(raw_xlet), {"tables": ["A"], "more_tables": ["X"]})
|
||||
self.assertEqual(
|
||||
parse_xlets(raw_xlet),
|
||||
{
|
||||
"tables": [OMEntity(entity=Table, fqn="A")],
|
||||
"more_tables": [OMEntity(entity=Table, fqn="X")],
|
||||
},
|
||||
)
|
||||
|
||||
raw_xlet_without_list = [{"tables": ["A"], "more_tables": "random"}]
|
||||
self.assertEqual(
|
||||
parse_xlets(raw_xlet_without_list),
|
||||
{
|
||||
"tables": ["A"],
|
||||
"tables": [OMEntity(entity=Table, fqn="A")],
|
||||
},
|
||||
)
|
||||
|
||||
@ -67,7 +116,7 @@ class TestAirflowLineageParser(TestCase):
|
||||
# But the outlets are parsed correctly
|
||||
self.assertEqual(
|
||||
get_xlets_from_operator(operator, xlet_mode=XLetsMode.OUTLETS),
|
||||
{"tables": ["A"]},
|
||||
{"tables": [OMEntity(entity=Table, fqn="A")]},
|
||||
)
|
||||
|
||||
operator = BashOperator(
|
||||
@ -78,20 +127,21 @@ class TestAirflowLineageParser(TestCase):
|
||||
|
||||
self.assertEqual(
|
||||
get_xlets_from_operator(operator, xlet_mode=XLetsMode.INLETS),
|
||||
{"tables": ["A"], "more_tables": ["X"]},
|
||||
{
|
||||
"tables": [OMEntity(entity=Table, fqn="A")],
|
||||
"more_tables": [OMEntity(entity=Table, fqn="X")],
|
||||
},
|
||||
)
|
||||
self.assertIsNone(
|
||||
get_xlets_from_operator(operator, xlet_mode=XLetsMode.OUTLETS)
|
||||
)
|
||||
|
||||
def test_get_xlets_from_dag(self):
|
||||
def test_get_string_xlets_from_dag(self):
|
||||
"""
|
||||
Check that we can properly join the xlet information from
|
||||
all operators in the DAG
|
||||
"""
|
||||
|
||||
sleep_1 = "sleep 1"
|
||||
|
||||
with DAG("test_dag", start_date=datetime(2021, 1, 1)) as dag:
|
||||
BashOperator(
|
||||
task_id="print_date",
|
||||
@ -101,12 +151,18 @@ class TestAirflowLineageParser(TestCase):
|
||||
|
||||
BashOperator(
|
||||
task_id="sleep",
|
||||
bash_command=sleep_1,
|
||||
bash_command=SLEEP,
|
||||
outlets={"tables": ["B"]},
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
get_xlets_from_dag(dag), [XLets(inlets={"A"}, outlets={"B"})]
|
||||
self.assertXLetsEquals(
|
||||
get_xlets_from_dag(dag),
|
||||
[
|
||||
XLets(
|
||||
inlets=[OMEntity(entity=Table, fqn="A")],
|
||||
outlets=[OMEntity(entity=Table, fqn="B")],
|
||||
)
|
||||
],
|
||||
)
|
||||
|
||||
with DAG("test_dag", start_date=datetime(2021, 1, 1)) as dag:
|
||||
@ -118,12 +174,18 @@ class TestAirflowLineageParser(TestCase):
|
||||
|
||||
BashOperator(
|
||||
task_id="sleep",
|
||||
bash_command=sleep_1,
|
||||
bash_command=SLEEP,
|
||||
outlets={"tables": ["B"]},
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
get_xlets_from_dag(dag), [XLets(inlets={"A"}, outlets={"B"})]
|
||||
self.assertXLetsEquals(
|
||||
get_xlets_from_dag(dag),
|
||||
[
|
||||
XLets(
|
||||
inlets=[OMEntity(entity=Table, fqn="A")],
|
||||
outlets=[OMEntity(entity=Table, fqn="B")],
|
||||
)
|
||||
],
|
||||
)
|
||||
|
||||
with DAG("test_dag", start_date=datetime(2021, 1, 1)) as dag:
|
||||
@ -139,18 +201,27 @@ class TestAirflowLineageParser(TestCase):
|
||||
|
||||
BashOperator(
|
||||
task_id="sleep",
|
||||
bash_command=sleep_1,
|
||||
bash_command=SLEEP,
|
||||
outlets={
|
||||
"tables": ["B"],
|
||||
"more_tables": ["Z"],
|
||||
},
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
self.assertXLetsEquals(
|
||||
get_xlets_from_dag(dag),
|
||||
[
|
||||
XLets(inlets={"A"}, outlets={"B"}),
|
||||
XLets(inlets={"X", "Y"}, outlets={"Z"}),
|
||||
XLets(
|
||||
inlets=[OMEntity(entity=Table, fqn="A")],
|
||||
outlets=[OMEntity(entity=Table, fqn="B")],
|
||||
),
|
||||
XLets(
|
||||
inlets=[
|
||||
OMEntity(entity=Table, fqn="X"),
|
||||
OMEntity(entity=Table, fqn="Y"),
|
||||
],
|
||||
outlets=[OMEntity(entity=Table, fqn="Z")],
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
@ -165,15 +236,156 @@ class TestAirflowLineageParser(TestCase):
|
||||
|
||||
BashOperator(
|
||||
task_id="sleep",
|
||||
bash_command=sleep_1,
|
||||
bash_command=SLEEP,
|
||||
outlets={
|
||||
"tables": ["B"],
|
||||
},
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
self.assertXLetsEquals(
|
||||
get_xlets_from_dag(dag),
|
||||
[
|
||||
XLets(inlets={"A", "B"}, outlets={"B"}),
|
||||
XLets(
|
||||
inlets=[
|
||||
OMEntity(entity=Table, fqn="A"),
|
||||
OMEntity(entity=Table, fqn="B"),
|
||||
],
|
||||
outlets=[OMEntity(entity=Table, fqn="B")],
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
def test_get_attrs_xlets_from_dag(self):
|
||||
"""
|
||||
Check that we can properly join the xlet information from
|
||||
all operators in the DAG
|
||||
"""
|
||||
with DAG("test_dag", start_date=datetime(2021, 1, 1)) as dag:
|
||||
BashOperator(
|
||||
task_id="print_date",
|
||||
bash_command="date",
|
||||
inlets=[
|
||||
OMEntity(entity=Table, fqn="A"),
|
||||
OMEntity(entity=Table, fqn="B"),
|
||||
],
|
||||
)
|
||||
|
||||
BashOperator(
|
||||
task_id="sleep",
|
||||
bash_command=SLEEP,
|
||||
outlets=[OMEntity(entity=Table, fqn="C")],
|
||||
)
|
||||
|
||||
BashOperator(
|
||||
task_id="sleep2",
|
||||
bash_command=SLEEP,
|
||||
outlets=[OMEntity(entity=Container, fqn="D")],
|
||||
)
|
||||
|
||||
self.assertXLetsEquals(
|
||||
get_xlets_from_dag(dag),
|
||||
[
|
||||
XLets(
|
||||
inlets=[
|
||||
OMEntity(entity=Table, fqn="A"),
|
||||
OMEntity(entity=Table, fqn="B"),
|
||||
],
|
||||
outlets=[
|
||||
OMEntity(entity=Table, fqn="C"),
|
||||
OMEntity(entity=Container, fqn="D"),
|
||||
],
|
||||
)
|
||||
],
|
||||
)
|
||||
|
||||
def test_om_entity_serializer(self):
|
||||
"""To ensure the serialized DAGs will have the right shape"""
|
||||
om_entity = OMEntity(
|
||||
entity=Table,
|
||||
fqn="FQN",
|
||||
key="test",
|
||||
)
|
||||
self.assertEquals(
|
||||
str(om_entity),
|
||||
'{"entity": "metadata.generated.schema.entity.data.table.Table", "fqn": "FQN", "key": "test"}',
|
||||
)
|
||||
|
||||
om_entity = OMEntity(
|
||||
entity=Container,
|
||||
fqn="FQN",
|
||||
key="test",
|
||||
)
|
||||
self.assertEquals(
|
||||
str(om_entity),
|
||||
'{"entity": "metadata.generated.schema.entity.data.container.Container", "fqn": "FQN", "key": "test"}',
|
||||
)
|
||||
|
||||
def test_str_deserializer(self):
|
||||
"""
|
||||
Once a DAG is serialized, the xlet info will be stored as:
|
||||
```
|
||||
['{"entity": "metadata.generated.schema.entity.data.table.Table", "fqn": "FQN", "key": "test"}']
|
||||
```
|
||||
based on our custom serialization logic.
|
||||
|
||||
Validate the deserialization process.
|
||||
"""
|
||||
self.assertIsNone(_parse_xlets("random"))
|
||||
|
||||
self.assertEquals(
|
||||
_parse_xlets(
|
||||
'{"entity": "metadata.generated.schema.entity.data.table.Table", "fqn": "FQN", "key": "test"}'
|
||||
),
|
||||
{
|
||||
"test": [
|
||||
OMEntity(
|
||||
entity=Table,
|
||||
fqn="FQN",
|
||||
key="test",
|
||||
)
|
||||
]
|
||||
},
|
||||
)
|
||||
|
||||
self.assertEquals(
|
||||
_parse_xlets(
|
||||
'{"entity": "metadata.generated.schema.entity.data.container.Container", "fqn": "FQN", "key": "test"}'
|
||||
),
|
||||
{
|
||||
"test": [
|
||||
OMEntity(
|
||||
entity=Container,
|
||||
fqn="FQN",
|
||||
key="test",
|
||||
)
|
||||
]
|
||||
},
|
||||
)
|
||||
|
||||
self.assertEquals(
|
||||
_parse_xlets(
|
||||
'{"entity": "metadata.generated.schema.entity.data.dashboard.Dashboard", "fqn": "FQN", "key": "test"}'
|
||||
),
|
||||
{
|
||||
"test": [
|
||||
OMEntity(
|
||||
entity=Dashboard,
|
||||
fqn="FQN",
|
||||
key="test",
|
||||
)
|
||||
]
|
||||
},
|
||||
)
|
||||
|
||||
def test_airflow_serializer(self):
|
||||
"""It should be able to serialize our models"""
|
||||
om_entity = OMEntity(
|
||||
entity=Table,
|
||||
fqn="FQN",
|
||||
key="test",
|
||||
)
|
||||
|
||||
self.assertEquals(
|
||||
serialize(om_entity).get("__data__"),
|
||||
'{"entity": "metadata.generated.schema.entity.data.table.Table", "fqn": "FQN", "key": "test"}',
|
||||
)
|
||||
|
@ -9,6 +9,116 @@ Regardless of the Airflow ingestion process you follow ([Workflow](/connectors/p
|
||||
[Lineage Backend](/connectors/pipeline/airflow/lineage-backend) or [Lineage Operator](/connectors/pipeline/airflow/lineage-operator)),
|
||||
OpenMetadata will try to extract the lineage information based on the tasks `inlets` and `outlets`.
|
||||
|
||||
What it's important to consider here is that when we are ingesting Airflow lineage, we are actually building a graph:
|
||||
|
||||
```
|
||||
Table A (node) -> DAG (edge) -> Table B (node)
|
||||
```
|
||||
|
||||
Where tables are nodes and DAGs (Pipelines) are considered edges. This means that the correct way of setting these
|
||||
parameters is by making sure that we are informing both `inlets` and `outlets`, so that we have the nodes to build
|
||||
the relationship.
|
||||
|
||||
## Configuring Lineage
|
||||
|
||||
{% note %}
|
||||
|
||||
This lineage configuration method is available for OpenMetadata release 1.2.3 or higher.
|
||||
|
||||
{% /note %}
|
||||
|
||||
Let's take a look at the following example:
|
||||
|
||||
```python
|
||||
from datetime import timedelta
|
||||
|
||||
from airflow import DAG
|
||||
from airflow.operators.dummy import DummyOperator
|
||||
from airflow.utils.dates import days_ago
|
||||
|
||||
from metadata.generated.schema.entity.data.container import Container
|
||||
from metadata.generated.schema.entity.data.table import Table
|
||||
from metadata.ingestion.source.pipeline.airflow.lineage_parser import OMEntity
|
||||
|
||||
|
||||
default_args = {
|
||||
'owner': 'airflow',
|
||||
'depends_on_past': False,
|
||||
'email': ['airflow@example.com'],
|
||||
'email_on_failure': False,
|
||||
'email_on_retry': False,
|
||||
'retries': 1,
|
||||
'retry_delay': timedelta(seconds=1),
|
||||
}
|
||||
|
||||
|
||||
with DAG(
|
||||
"test-lineage",
|
||||
default_args=default_args,
|
||||
description="An example DAG which runs a lineage test",
|
||||
start_date=days_ago(1),
|
||||
is_paused_upon_creation=False,
|
||||
catchup=False,
|
||||
) as dag:
|
||||
|
||||
|
||||
t0 = DummyOperator(
|
||||
task_id='task0',
|
||||
inlets=[
|
||||
OMEntity(entity=Container, fqn="Container A", key="group_A"),
|
||||
OMEntity(entity=Table, fqn="Table X", key="group_B"),
|
||||
]
|
||||
)
|
||||
|
||||
t1 = DummyOperator(
|
||||
task_id='task10',
|
||||
outlets=[
|
||||
OMEntity(entity=Table, fqn="Table B", key="group_A"),
|
||||
OMEntity(entity=Table, fqn="Table Y", key="group_B"),
|
||||
]
|
||||
)
|
||||
|
||||
t0 >> t1
|
||||
```
|
||||
|
||||
We are passing inlets and outlets as a list of the `OMEntity` class, that lets us specify:
|
||||
1. The type of the asset we are using: Table, Container,... following our SDK
|
||||
2. The FQN of the asset, which is the unique name of each asset in OpenMetadata, e.g., `serviceName.databaseName.schemaName.tableName`.
|
||||
3. The key to group the lineage if needed.
|
||||
|
||||
This `OMEntity` class is defined following the example of Airflow's internal lineage
|
||||
[models](https://github.com/apache/airflow/blob/main/airflow/lineage/entities.py).
|
||||
|
||||
## Keys
|
||||
|
||||
We can inform the lineage dependencies among different groups of tables. In the example above, we are not building the
|
||||
lineage from all inlets to all outlets, but rather grouping the tables by key (`group_A` and `group_B`).
|
||||
This means that after this lineage is processed, the relationship will be:
|
||||
|
||||
```
|
||||
Container A (node) -> DAG (edge) -> Table B (node)
|
||||
```
|
||||
|
||||
and
|
||||
|
||||
```
|
||||
Table X (node) -> DAG (edge) -> Table Y (node)
|
||||
```
|
||||
|
||||
It does not matter in which task of the DAG these inlet/outlet information is specified. During the ingestion process we
|
||||
group all these details at the DAG level.
|
||||
|
||||
|
||||
## Configuring Lineage between Tables
|
||||
|
||||
{% note %}
|
||||
|
||||
Note that this method only allows lineage between Tables.
|
||||
|
||||
We will deprecate it in OpenMetadata 1.4
|
||||
|
||||
{% /note %}
|
||||
|
||||
Let's take a look at the following example:
|
||||
|
||||
```python
|
||||
@ -33,7 +143,7 @@ default_args = {
|
||||
with DAG(
|
||||
"test-multiple-inlet-keys",
|
||||
default_args=default_args,
|
||||
description="An example DAG which runs a a task group lineage test",
|
||||
description="An example DAG which runs a lineage test",
|
||||
start_date=days_ago(1),
|
||||
is_paused_upon_creation=False,
|
||||
catchup=False,
|
||||
@ -43,25 +153,22 @@ with DAG(
|
||||
t0 = DummyOperator(
|
||||
task_id='task0',
|
||||
inlets={
|
||||
"tables": ["Table A"],
|
||||
"more_tables": ["Table X"]
|
||||
"group_A": ["Table A"],
|
||||
"group_B": ["Table X"]
|
||||
}
|
||||
)
|
||||
|
||||
t1 = DummyOperator(
|
||||
task_id='task10',
|
||||
outlets={
|
||||
"tables": ["Table B"],
|
||||
"more_tables": ["Table Y"]
|
||||
"group_A": ["Table B"],
|
||||
"group_B": ["Table Y"]
|
||||
}
|
||||
)
|
||||
|
||||
t0 >> t1
|
||||
```
|
||||
|
||||
Note how we have two tasks:
|
||||
- `t0`: Informing the `inlets`, with keys `tables` and `more_tables`.
|
||||
- `t1`: Informing the `outlets` with keys `tables` and `more_tables`.
|
||||
|
||||
{% note %}
|
||||
|
||||
@ -70,32 +177,3 @@ Make sure to add the table Fully Qualified Name (FQN), which is the unique name
|
||||
This name is composed as `serviceName.databaseName.schemaName.tableName`.
|
||||
|
||||
{% /note %}
|
||||
|
||||
What it's important to consider here is that when we are ingesting Airflow lineage, we are actually building a graph:
|
||||
|
||||
```
|
||||
Table A (node) -> DAG (edge) -> Table B (node)
|
||||
```
|
||||
|
||||
Where tables are nodes and DAGs (Pipelines) are considered edges. This means that the correct way of setting this
|
||||
parameters is by making sure that we are informing both `inlets` and `outlets`, so that we have the nodes to build
|
||||
the relationship.
|
||||
|
||||
## Keys
|
||||
|
||||
We can inform the lineage dependencies among different groups of tables. In the example above, we are not building the
|
||||
lineage from all inlets to all outlets, but rather grouping the tables by the dictionary key (`tables` and `more_tables`).
|
||||
This means that after this lineage is processed, the relationship will be:
|
||||
|
||||
```
|
||||
Table A (node) -> DAG (edge) -> Table B (node)
|
||||
```
|
||||
|
||||
and
|
||||
|
||||
```
|
||||
Table X (node) -> DAG (edge) -> Table Y (node)
|
||||
```
|
||||
|
||||
It does not matter in which task of the DAG these inlet/outlet information is specified. During the ingestion process we
|
||||
group all these details at the DAG level.
|
||||
|
@ -32,20 +32,11 @@ distribution:
|
||||
pip3 install "openmetadata-ingestion==x.y.z"
|
||||
```
|
||||
|
||||
Where `x.y.z` is the version of your OpenMetadata server, e.g., 0.13.0. It is important that server and client
|
||||
versions match.
|
||||
Where `x.y.z` is the version of your OpenMetadata server, e.g., 1.2.2. **It is important that server and client
|
||||
versions match**.
|
||||
|
||||
### Adding Lineage Config
|
||||
|
||||
<Note>
|
||||
|
||||
If using OpenMetadata version 0.13.0 or lower, the import for the lineage backend is
|
||||
`airflow_provider_openmetadata.lineage.openmetadata.OpenMetadataLineageBackend`.
|
||||
|
||||
For 0.13.1 or higher, the import has been renamed to `airflow_provider_openmetadata.lineage.backend.OpenMetadataLineageBackend`.
|
||||
|
||||
</Note>
|
||||
|
||||
After the installation, we need to update the Airflow configuration. This can be done following this example on
|
||||
`airflow.cfg`:
|
||||
|
||||
@ -81,11 +72,10 @@ max_status = 10
|
||||
```
|
||||
|
||||
- `only_keep_dag_lineage` will remove any table lineage not present in the inlets or outlets. This will ensure
|
||||
that any lineage in OpenMetadata comes from your code.
|
||||
that any lineage in OpenMetadata comes only from your code.
|
||||
- `max_status` controls the number of status to ingest in each run. By default, we'll pick the last 10.
|
||||
|
||||
|
||||
|
||||
In the following sections, we'll show how to adapt our pipelines to help us build the lineage information.
|
||||
|
||||
## Lineage Backend
|
||||
@ -139,13 +129,13 @@ and downstream for outlets) between the Pipeline and Table Entities.
|
||||
It is important to get the naming right, as we will fetch the Table Entity by its FQN. If no information is specified
|
||||
in terms of lineage, we will just ingest the Pipeline Entity without adding further information.
|
||||
|
||||
<Note>
|
||||
{% note %}
|
||||
|
||||
While we are showing here how to parse the lineage using the Lineage Backend, the setup of `inlets` and `outlets`
|
||||
is supported as well through external metadata ingestion from Airflow, be it via the UI, CLI or directly running
|
||||
an extraction DAG from Airflow itself.
|
||||
|
||||
</Note>
|
||||
{% /note %}
|
||||
|
||||
## Example
|
||||
|
||||
@ -246,7 +236,7 @@ backend = airflow_provider_openmetadata.lineage.backend.OpenMetadataLineageBacke
|
||||
airflow_service_name = local_airflow
|
||||
openmetadata_api_endpoint = http://localhost:8585/api
|
||||
auth_provider_type = openmetadata
|
||||
jwt_token = eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg
|
||||
jwt_token = ...
|
||||
```
|
||||
|
||||
After running the DAG, you should be able to see the following information in the ingested Pipeline:
|
||||
|
Loading…
x
Reference in New Issue
Block a user