Fix #3645 - Revisit lineage for Airflow 1.10.15 (#3696)

Fix #3645 - Revisit lineage for Airflow 1.10.15 (#3696)
This commit is contained in:
Pere Miquel Brull 2022-03-28 08:33:02 +02:00 committed by GitHub
parent 351db56687
commit 1c21abebdd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 42 additions and 12 deletions

View File

@ -40,7 +40,7 @@ base_requirements = {
"python-jose==3.3.0", "python-jose==3.3.0",
"sqlalchemy>=1.4.0", "sqlalchemy>=1.4.0",
"sql-metadata~=2.0.0", "sql-metadata~=2.0.0",
"requests~=2.26", "requests>=2.23",
"cryptography", "cryptography",
"Jinja2>=2.11.3", "Jinja2>=2.11.3",
"PyYAML", "PyYAML",
@ -66,7 +66,9 @@ plugins: Dict[str, Set[str]] = {
"marshmallow-sqlalchemy>=0.26.0", "marshmallow-sqlalchemy>=0.26.0",
"SQLAlchemy-Utils>=0.38.0", "SQLAlchemy-Utils>=0.38.0",
"pymysql>=1.0.2", "pymysql>=1.0.2",
"requests==2.26.0",
}, },
"airflow-container-1.10.15": {"markupsafe==2.0.1 ", "requests==2.23.0"},
"amundsen": {"neo4j~=4.4.0"}, "amundsen": {"neo4j~=4.4.0"},
"athena": {"PyAthena[SQLAlchemy]"}, "athena": {"PyAthena[SQLAlchemy]"},
"atlas": {}, "atlas": {},
@ -187,7 +189,7 @@ setup(
*[ *[
requirements requirements
for plugin, requirements in plugins.items() for plugin, requirements in plugins.items()
if plugin != "db2" if plugin not in {"airflow-container-1.10.15", "db2"}
] ]
) )
), ),

View File

@ -58,8 +58,8 @@ class OpenMetadataLineageBackend(LineageBackend):
_ = get_lineage_config() _ = get_lineage_config()
# pylint: disable=protected-access # pylint: disable=protected-access
@staticmethod # needed for Airflow 1.10.x
def send_lineage( def send_lineage(
self,
operator: "BaseOperator", operator: "BaseOperator",
inlets: Optional[List] = None, inlets: Optional[List] = None,
outlets: Optional[List] = None, outlets: Optional[List] = None,

View File

@ -14,7 +14,7 @@ OpenMetadata Airflow Lineage Backend
""" """
import traceback import traceback
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union from typing import TYPE_CHECKING, Dict, List, Optional
from airflow.configuration import conf from airflow.configuration import conf
@ -70,9 +70,37 @@ def is_airflow_version_1() -> bool:
return True return True
def parse_v1_xlets(xlet: dict) -> Optional[List[str]]:
"""
Parse airflow xlets for V1
:param xlet: airflow v1 xlet dict
:return: table list or None
"""
if isinstance(xlet, dict):
tables = xlet.get("tables")
if tables and isinstance(tables, list):
return tables
return None
def parse_xlets(xlet: List[dict]) -> Optional[List[str]]:
"""
Parse airflow xlets for V1
:param xlet: airflow v2 xlet dict
:return: table list or None
"""
if len(xlet) and isinstance(xlet[0], dict):
tables = xlet[0].get("tables")
if tables and isinstance(tables, list):
return tables
return None
def get_xlets( def get_xlets(
operator: "BaseOperator", xlet_mode: str = "_inlets" operator: "BaseOperator", xlet_mode: str = "_inlets"
) -> Union[Optional[List[str]], Any]: ) -> Optional[List[str]]:
""" """
Given an Airflow DAG Task, obtain the tables Given an Airflow DAG Task, obtain the tables
set in inlets or outlets. set in inlets or outlets.
@ -86,15 +114,15 @@ def get_xlets(
""" """
xlet = getattr(operator, xlet_mode) xlet = getattr(operator, xlet_mode)
if is_airflow_version_1(): if is_airflow_version_1():
return xlet tables = parse_v1_xlets(xlet)
if len(xlet) and isinstance(xlet[0], dict): else:
tables = xlet[0].get("tables") tables = parse_xlets(xlet)
if isinstance(tables, list) and len(tables):
return tables
operator.log.info(f"Not finding proper {xlet_mode} in task {operator.task_id}") if not tables:
return None operator.log.info(f"Not finding proper {xlet_mode} in task {operator.task_id}")
return tables
def create_or_update_pipeline( # pylint: disable=too-many-locals def create_or_update_pipeline( # pylint: disable=too-many-locals