feat(ingest/airflow): support airflow 2.8 dataset listeners (#9664)

This commit is contained in:
Harshal Sheth 2024-01-19 16:18:15 -08:00 committed by GitHub
parent bd9b0c49fb
commit 14dee5723d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 37 additions and 5 deletions

View File

@ -40,13 +40,16 @@ jobs:
extra_pip_requirements: "apache-airflow~=2.2.4"
extra_pip_extras: plugin-v1
- python-version: "3.10"
extra_pip_requirements: 'apache-airflow~=2.4.0 pluggy==1.0.0 "pendulum<3.0"'
extra_pip_requirements: 'apache-airflow~=2.4.0 pluggy==1.0.0 "pendulum<3.0" "Flask-Session<0.6.0"'
extra_pip_extras: plugin-v2
- python-version: "3.10"
extra_pip_requirements: 'apache-airflow~=2.6.0 "pendulum<3.0"'
extra_pip_requirements: 'apache-airflow~=2.6.0 "pendulum<3.0" "Flask-Session<0.6.0"'
extra_pip_extras: plugin-v2
- python-version: "3.10"
extra_pip_requirements: "apache-airflow>=2.7.0 pydantic==2.4.2"
extra_pip_requirements: 'apache-airflow~=2.7.0 pydantic==2.4.2 "Flask-Session<0.6.0"'
extra_pip_extras: plugin-v2
- python-version: "3.10"
extra_pip_requirements: 'apache-airflow>=2.8.0 pydantic>=2.4.2 "Flask-Session<0.6.0"'
extra_pip_extras: plugin-v2
fail-fast: false
steps:

View File

@ -32,6 +32,9 @@ PLUGGY_VERSION = packaging.version.parse(pluggy.__version__)
HAS_AIRFLOW_STANDALONE_CMD = AIRFLOW_VERSION >= packaging.version.parse("2.2.0.dev0")
HAS_AIRFLOW_LISTENER_API = AIRFLOW_VERSION >= packaging.version.parse("2.3.0.dev0")
HAS_AIRFLOW_DAG_LISTENER_API = AIRFLOW_VERSION >= packaging.version.parse("2.5.0.dev0")
HAS_AIRFLOW_DATASET_LISTENER_API = AIRFLOW_VERSION >= packaging.version.parse(
"2.8.0.dev0"
)
NEEDS_AIRFLOW_LISTENER_MODULE = AIRFLOW_VERSION < packaging.version.parse(
"2.5.0.dev0"
) or PLUGGY_VERSION <= packaging.version.parse("1.0.0")

View File

@ -24,6 +24,7 @@ from openlineage.client.serde import Serde
from datahub_airflow_plugin._airflow_shims import (
HAS_AIRFLOW_DAG_LISTENER_API,
HAS_AIRFLOW_DATASET_LISTENER_API,
Operator,
get_task_inlets,
get_task_outlets,
@ -40,6 +41,7 @@ from datahub_airflow_plugin.entities import (
_F = TypeVar("_F", bound=Callable[..., None])
if TYPE_CHECKING:
from airflow.datasets import Dataset
from airflow.models import DAG, DagRun, TaskInstance
from sqlalchemy.orm import Session
@ -502,3 +504,23 @@ class DataHubListener:
self.emitter.flush()
# TODO: Add hooks for on_dag_run_success, on_dag_run_failed -> call AirflowGenerator.complete_dataflow
if HAS_AIRFLOW_DATASET_LISTENER_API:
@hookimpl
@run_in_thread
def on_dataset_created(self, dataset: "Dataset") -> None:
self._set_log_level()
logger.debug(
f"DataHub listener got notification about dataset create for {dataset}"
)
@hookimpl
@run_in_thread
def on_dataset_changed(self, dataset: "Dataset") -> None:
self._set_log_level()
logger.debug(
f"DataHub listener got notification about dataset change for {dataset}"
)

View File

@ -4,7 +4,7 @@
# and then run "tox" from this directory.
[tox]
envlist = py38-airflow21, py38-airflow22, py310-airflow24, py310-airflow26, py310-airflow27
envlist = py38-airflow21, py38-airflow22, py310-airflow24, py310-airflow26, py310-airflow27, py310-airflow28
[testenv]
use_develop = true
@ -30,10 +30,14 @@ deps =
# constraints file is overly restrictive.
airflow27: apache-airflow~=2.7.0
airflow27: pydantic==2.4.2
airflow28: apache-airflow~=2.8.0
# Apparently Flask-Session 0.6.0 was released by accident.
# See https://github.com/pallets-eco/flask-session/issues/209
airflow24,airflow26,airflow27,airflow28: Flask-Session<0.6.0
commands =
pytest --cov-append {posargs}
# For Airflow 2.4+, add the plugin-v2 extra.
[testenv:py310-airflow{24,26,27}]
[testenv:py310-airflow{24,26,27,28}]
extras = dev,integration-tests,plugin-v2