Fix #12190 - Bring support for Airflow 2.6 & Use Airflow 2.6.3 as the new ingestion base (#12398)

* Bump Airflow version

* Support Airflow 2.6

* Lint

* Bump airflow version

* Fix airflow 2.6 lineage

* Fix airflow 2.6 lineage
This commit is contained in:
Pere Miquel Brull 2023-07-13 06:41:59 +02:00 committed by GitHub
parent 31144961e1
commit a3bff293a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 95 additions and 28 deletions

View File

@ -1,4 +1,4 @@
FROM apache/airflow:2.3.3-python3.9
FROM apache/airflow:2.6.3-python3.9
USER root
RUN curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add -
RUN curl https://packages.microsoft.com/config/debian/11/prod.list > /etc/apt/sources.list.d/mssql-release.list
@ -75,7 +75,7 @@ USER airflow
# Argument to provide for Ingestion Dependencies to install. Defaults to all
ARG INGESTION_DEPENDENCY="all"
RUN pip install --upgrade pip
RUN pip install "openmetadata-managed-apis==1.2.0.0.dev0" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.3.3/constraints-3.9.txt"
RUN pip install "openmetadata-managed-apis==1.2.0.0.dev0" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.6.3/constraints-3.9.txt"
RUN pip install "openmetadata-ingestion[${INGESTION_DEPENDENCY}]==1.2.0.0.dev0"
# Temporary workaround for https://github.com/open-metadata/OpenMetadata/issues/9593

View File

@ -1,4 +1,4 @@
FROM apache/airflow:2.3.3-python3.9
FROM apache/airflow:2.6.3-python3.9
USER root
RUN curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add -
RUN curl https://packages.microsoft.com/config/debian/11/prod.list > /etc/apt/sources.list.d/mssql-release.list
@ -74,7 +74,7 @@ COPY --chown=airflow:0 ingestion/examples/airflow/dags /opt/airflow/dags
# Provide Execute Permissions to shell script
RUN chmod +x /opt/airflow/ingestion_dependency.sh
USER airflow
ARG AIRFLOW_CONSTRAINTS_LOCATION="https://raw.githubusercontent.com/apache/airflow/constraints-2.3.3/constraints-3.9.txt"
ARG AIRFLOW_CONSTRAINTS_LOCATION="https://raw.githubusercontent.com/apache/airflow/constraints-2.6.3/constraints-3.9.txt"
# Argument to provide for Ingestion Dependencies to install. Defaults to all
RUN pip install --upgrade pip

View File

@ -28,7 +28,7 @@ def get_long_description():
# Add here versions required for multiple plugins
VERSIONS = {
"airflow": "apache-airflow==2.3.3",
"airflow": "apache-airflow==2.6.3",
"avro": "avro~=1.11",
"boto3": "boto3>=1.20,<2.0", # No need to add botocore separately. It's a dep from boto3
"geoalchemy2": "GeoAlchemy2~=0.12",

View File

@ -64,14 +64,25 @@ and we'll treat this as independent sets of lineage
"""
import logging
import traceback
from enum import Enum
from typing import Dict, List, Optional, Set
from pydantic import BaseModel
logger = logging.getLogger("airflow.task")
INLETS_ATTR = "_inlets"
OUTLETS_ATTR = "_outlets"
class XLetsMode(Enum):
INLETS = "inlets"
OUTLETS = "outlets"
class XLetsAttr(Enum):
INLETS = "inlets"
PRIVATE_INLETS = "_inlets"
OUTLETS = "outlets"
PRIVATE_OUTLETS = "_outlets"
class XLets(BaseModel):
@ -107,7 +118,7 @@ def parse_xlets(xlet: List[dict]) -> Optional[Dict[str, List[str]]]:
def get_xlets_from_operator(
operator: "BaseOperator", xlet_mode: str = INLETS_ATTR
operator: "BaseOperator", xlet_mode: XLetsMode
) -> Optional[Dict[str, List[str]]]:
"""
Given an Airflow DAG Task, obtain the tables
@ -120,7 +131,25 @@ def get_xlets_from_operator(
:param xlet_mode: get inlet or outlet
:return: list of tables FQN
"""
xlet = getattr(operator, xlet_mode) if hasattr(operator, xlet_mode) else []
attribute = None
if xlet_mode == XLetsMode.INLETS:
attribute = (
XLetsAttr.INLETS.value
if hasattr(operator, XLetsAttr.INLETS.value)
else XLetsAttr.PRIVATE_INLETS.value
)
if xlet_mode == XLetsMode.OUTLETS:
attribute = (
XLetsAttr.OUTLETS.value
if hasattr(operator, XLetsAttr.OUTLETS.value)
else XLetsAttr.PRIVATE_OUTLETS.value
)
if attribute is None:
raise ValueError(f"Missing attribute for {xlet_mode.value}")
xlet = getattr(operator, attribute) or []
xlet_data = parse_xlets(xlet)
if not xlet_data:
@ -146,14 +175,14 @@ def get_xlets_from_dag(dag: "DAG") -> List[XLets]:
_inlets.update(
get_xlets_from_operator(
operator=task,
xlet_mode=INLETS_ATTR if hasattr(task, INLETS_ATTR) else "inlets",
xlet_mode=XLetsMode.INLETS,
)
or []
)
_outlets.update(
get_xlets_from_operator(
operator=task,
xlet_mode=OUTLETS_ATTR if hasattr(task, INLETS_ATTR) else "outlets",
xlet_mode=XLetsMode.OUTLETS,
)
or []
)

View File

@ -18,9 +18,8 @@ from airflow import DAG
from airflow.operators.bash import BashOperator
from metadata.ingestion.source.pipeline.airflow.lineage_parser import (
INLETS_ATTR,
OUTLETS_ATTR,
XLets,
XLetsMode,
get_xlets_from_dag,
get_xlets_from_operator,
parse_xlets,
@ -64,11 +63,11 @@ class TestAirflowLineageParser(TestCase):
outlets={"tables": ["A"]},
)
# By default we try with inlets. There are none here
self.assertIsNone(get_xlets_from_operator(operator))
self.assertIsNone(get_xlets_from_operator(operator, XLetsMode.INLETS))
# But the outlets are parsed correctly
self.assertEqual(
get_xlets_from_operator(operator, xlet_mode=OUTLETS_ATTR), {"tables": ["A"]}
get_xlets_from_operator(operator, xlet_mode=XLetsMode.OUTLETS),
{"tables": ["A"]},
)
operator = BashOperator(
@ -78,10 +77,12 @@ class TestAirflowLineageParser(TestCase):
)
self.assertEqual(
get_xlets_from_operator(operator, xlet_mode=INLETS_ATTR),
get_xlets_from_operator(operator, xlet_mode=XLetsMode.INLETS),
{"tables": ["A"], "more_tables": ["X"]},
)
self.assertIsNone(get_xlets_from_operator(operator, xlet_mode=OUTLETS_ATTR))
self.assertIsNone(
get_xlets_from_operator(operator, xlet_mode=XLetsMode.OUTLETS)
)
def test_get_xlets_from_dag(self):
"""
@ -89,6 +90,8 @@ class TestAirflowLineageParser(TestCase):
all operators in the DAG
"""
sleep_1 = "sleep 1"
with DAG("test_dag", start_date=datetime(2021, 1, 1)) as dag:
BashOperator(
task_id="print_date",
@ -98,7 +101,7 @@ class TestAirflowLineageParser(TestCase):
BashOperator(
task_id="sleep",
bash_command="sleep 1",
bash_command=sleep_1,
outlets={"tables": ["B"]},
)
@ -115,7 +118,7 @@ class TestAirflowLineageParser(TestCase):
BashOperator(
task_id="sleep",
bash_command="sleep 1",
bash_command=sleep_1,
outlets={"tables": ["B"]},
)
@ -136,7 +139,7 @@ class TestAirflowLineageParser(TestCase):
BashOperator(
task_id="sleep",
bash_command="sleep 1",
bash_command=sleep_1,
outlets={
"tables": ["B"],
"more_tables": ["Z"],
@ -162,7 +165,7 @@ class TestAirflowLineageParser(TestCase):
BashOperator(
task_id="sleep",
bash_command="sleep 1",
bash_command=sleep_1,
outlets={
"tables": ["B"],
},

View File

@ -18,8 +18,8 @@ from multiprocessing import Process
from typing import Optional
from airflow import settings
from airflow.jobs.scheduler_job import SchedulerJob
from airflow.models import DagBag
from airflow.version import version as airflow_version
from flask import request
from openmetadata_managed_apis.utils.logger import api_logger
@ -110,15 +110,50 @@ def get_dagbag():
class ScanDagsTask(Process):
def run(self):
scheduler_job = SchedulerJob(num_times_parse_dags=1)
scheduler_job.heartrate = 0
scheduler_job.run()
if airflow_version >= "2.6":
scheduler_job = self._run_new_scheduler_job()
else:
scheduler_job = self._run_old_scheduler_job()
try:
scheduler_job.kill()
except Exception as exc:
logger.debug(traceback.format_exc())
logger.info(f"Rescan Complete: Killed Job: {exc}")
@staticmethod
def _run_new_scheduler_job() -> "Job":
"""
Run the new scheduler job from Airflow 2.6
"""
from airflow.jobs.job import Job, run_job
from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
scheduler_job = Job()
job_runner = SchedulerJobRunner(
job=scheduler_job,
num_runs=1,
)
scheduler_job.heartrate = 0
# pylint: disable=protected-access
run_job(scheduler_job, execute_callable=job_runner._execute)
return scheduler_job
@staticmethod
def _run_old_scheduler_job() -> "SchedulerJob":
"""
Run the old scheduler job before 2.6
"""
from airflow.jobs.scheduler_job import SchedulerJob
scheduler_job = SchedulerJob(num_times_parse_dags=1)
scheduler_job.heartrate = 0
scheduler_job.run()
return scheduler_job
def scan_dags_job_background():
"""

View File

@ -36,7 +36,6 @@ as a starting point.
If you are using our `openmetadata/ingestion` Docker image, there is just one thing to do: Configure the OpenMetadata server.
The OpenMetadata server takes all its configurations from a YAML file. You can find them in our [repo](https://github.com/open-metadata/OpenMetadata/tree/main/conf). In
`openmetadata.yaml`, update the `pipelineServiceClientConfiguration` section accordingly.
@ -90,7 +89,8 @@ openmetadata:
## Custom Airflow Installation
{% note %}
Note that the `openmetadata-ingestion` only supports Python versions 3.7, 3.8 and 3.9.
- Note that the `openmetadata-ingestion` only supports Python versions 3.7, 3.8 and 3.9.
- The supported Airflow versions are 2.3, 2.4 and 2.5. From release 1.1.1 onwards, OpenMetadata will also support Airflow 2.6.
{% /note %}
You will need to follow three steps: