diff --git a/.github/workflows/airflow-plugin.yml b/.github/workflows/airflow-plugin.yml index 158d3416bc..7ae7b87b0f 100644 --- a/.github/workflows/airflow-plugin.yml +++ b/.github/workflows/airflow-plugin.yml @@ -40,13 +40,16 @@ jobs: extra_pip_requirements: "apache-airflow~=2.2.4" extra_pip_extras: plugin-v1 - python-version: "3.10" - extra_pip_requirements: 'apache-airflow~=2.4.0 pluggy==1.0.0 "pendulum<3.0"' + extra_pip_requirements: 'apache-airflow~=2.4.0 pluggy==1.0.0 "pendulum<3.0" "Flask-Session<0.6.0"' extra_pip_extras: plugin-v2 - python-version: "3.10" - extra_pip_requirements: 'apache-airflow~=2.6.0 "pendulum<3.0"' + extra_pip_requirements: 'apache-airflow~=2.6.0 "pendulum<3.0" "Flask-Session<0.6.0"' extra_pip_extras: plugin-v2 - python-version: "3.10" - extra_pip_requirements: "apache-airflow>=2.7.0 pydantic==2.4.2" + extra_pip_requirements: 'apache-airflow~=2.7.0 pydantic==2.4.2 "Flask-Session<0.6.0"' + extra_pip_extras: plugin-v2 + - python-version: "3.10" + extra_pip_requirements: 'apache-airflow>=2.8.0 pydantic>=2.4.2 "Flask-Session<0.6.0"' extra_pip_extras: plugin-v2 fail-fast: false steps: diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_airflow_shims.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_airflow_shims.py index d384958cf3..c1e2dd4cc4 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_airflow_shims.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_airflow_shims.py @@ -32,6 +32,9 @@ PLUGGY_VERSION = packaging.version.parse(pluggy.__version__) HAS_AIRFLOW_STANDALONE_CMD = AIRFLOW_VERSION >= packaging.version.parse("2.2.0.dev0") HAS_AIRFLOW_LISTENER_API = AIRFLOW_VERSION >= packaging.version.parse("2.3.0.dev0") HAS_AIRFLOW_DAG_LISTENER_API = AIRFLOW_VERSION >= packaging.version.parse("2.5.0.dev0") +HAS_AIRFLOW_DATASET_LISTENER_API = AIRFLOW_VERSION >= packaging.version.parse( + "2.8.0.dev0" +) NEEDS_AIRFLOW_LISTENER_MODULE = AIRFLOW_VERSION < packaging.version.parse( "2.5.0.dev0" ) or PLUGGY_VERSION <= packaging.version.parse("1.0.0") diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py index debc91700d..a7f588a166 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py @@ -24,6 +24,7 @@ from openlineage.client.serde import Serde from datahub_airflow_plugin._airflow_shims import ( HAS_AIRFLOW_DAG_LISTENER_API, + HAS_AIRFLOW_DATASET_LISTENER_API, Operator, get_task_inlets, get_task_outlets, @@ -40,6 +41,7 @@ from datahub_airflow_plugin.entities import ( _F = TypeVar("_F", bound=Callable[..., None]) if TYPE_CHECKING: + from airflow.datasets import Dataset from airflow.models import DAG, DagRun, TaskInstance from sqlalchemy.orm import Session @@ -502,3 +504,23 @@ class DataHubListener: self.emitter.flush() # TODO: Add hooks for on_dag_run_success, on_dag_run_failed -> call AirflowGenerator.complete_dataflow + + if HAS_AIRFLOW_DATASET_LISTENER_API: + + @hookimpl + @run_in_thread + def on_dataset_created(self, dataset: "Dataset") -> None: + self._set_log_level() + + logger.debug( + f"DataHub listener got notification about dataset create for {dataset}" + ) + + @hookimpl + @run_in_thread + def on_dataset_changed(self, dataset: "Dataset") -> None: + self._set_log_level() + + logger.debug( + f"DataHub listener got notification about dataset change for {dataset}" + ) diff --git a/metadata-ingestion-modules/airflow-plugin/tox.ini b/metadata-ingestion-modules/airflow-plugin/tox.ini index 27ae2ce65b..154ced6b8d 100644 --- a/metadata-ingestion-modules/airflow-plugin/tox.ini +++ b/metadata-ingestion-modules/airflow-plugin/tox.ini @@ -4,7 +4,7 @@ # and then run "tox" from this directory. [tox] -envlist = py38-airflow21, py38-airflow22, py310-airflow24, py310-airflow26, py310-airflow27 +envlist = py38-airflow21, py38-airflow22, py310-airflow24, py310-airflow26, py310-airflow27, py310-airflow28 [testenv] use_develop = true @@ -30,10 +30,14 @@ deps = # constraints file is overly restrictive. airflow27: apache-airflow~=2.7.0 airflow27: pydantic==2.4.2 + airflow28: apache-airflow~=2.8.0 + # Apparently Flask-Session 0.6.0 was released by accident. + # See https://github.com/pallets-eco/flask-session/issues/209 + airflow24,airflow26,airflow27,airflow28: Flask-Session<0.6.0 commands = pytest --cov-append {posargs} # For Airflow 2.4+, add the plugin-v2 extra. -[testenv:py310-airflow{24,26,27}] +[testenv:py310-airflow{24,26,27,28}] extras = dev,integration-tests,plugin-v2