feat(airflow): drop Airflow < 2.3 support + make plugin v2 the default (#12056)

This commit is contained in:
Harshal Sheth 2024-12-09 14:08:49 -05:00 committed by GitHub
parent 4811de1c1d
commit 2babbe6b0f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 241 additions and 72 deletions

View File

@ -34,29 +34,21 @@ jobs:
include: include:
# Note: this should be kept in sync with tox.ini. # Note: this should be kept in sync with tox.ini.
- python-version: "3.8" - python-version: "3.8"
extra_pip_requirements: "apache-airflow~=2.1.4" extra_pip_requirements: "apache-airflow~=2.3.4"
extra_pip_extras: plugin-v1 extra_pip_extras: test-airflow23
- python-version: "3.8"
extra_pip_requirements: "apache-airflow~=2.2.4"
extra_pip_extras: plugin-v1
- python-version: "3.10" - python-version: "3.10"
extra_pip_requirements: "apache-airflow~=2.4.3" extra_pip_requirements: "apache-airflow~=2.4.3"
extra_pip_extras: plugin-v2,test-airflow24 extra_pip_extras: test-airflow24
- python-version: "3.10" - python-version: "3.10"
extra_pip_requirements: "apache-airflow~=2.6.3 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.6.3/constraints-3.10.txt" extra_pip_requirements: "apache-airflow~=2.6.3 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.6.3/constraints-3.10.txt"
extra_pip_extras: plugin-v2
- python-version: "3.10" - python-version: "3.10"
extra_pip_requirements: "apache-airflow~=2.7.3 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.7.3/constraints-3.10.txt" extra_pip_requirements: "apache-airflow~=2.7.3 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.7.3/constraints-3.10.txt"
extra_pip_extras: plugin-v2
- python-version: "3.10" - python-version: "3.10"
extra_pip_requirements: "apache-airflow~=2.8.1 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.8.1/constraints-3.10.txt" extra_pip_requirements: "apache-airflow~=2.8.1 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.8.1/constraints-3.10.txt"
extra_pip_extras: plugin-v2
- python-version: "3.11" - python-version: "3.11"
extra_pip_requirements: "apache-airflow~=2.9.3 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.9.3/constraints-3.11.txt" extra_pip_requirements: "apache-airflow~=2.9.3 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.9.3/constraints-3.11.txt"
extra_pip_extras: plugin-v2
- python-version: "3.11" - python-version: "3.11"
extra_pip_requirements: "apache-airflow~=2.10.2 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.10.2/constraints-3.11.txt" extra_pip_requirements: "apache-airflow~=2.10.3 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.10.3/constraints-3.11.txt"
extra_pip_extras: plugin-v2
fail-fast: false fail-fast: false
steps: steps:
- name: Set up JDK 17 - name: Set up JDK 17

View File

@ -35,6 +35,8 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
- #11701: The Fivetran `sources_to_database` field is deprecated in favor of setting directly within `sources_to_platform_instance.<key>.database`. - #11701: The Fivetran `sources_to_database` field is deprecated in favor of setting directly within `sources_to_platform_instance.<key>.database`.
- #11742: For PowerBi ingestion, `use_powerbi_email` is now enabled by default when extracting ownership information. - #11742: For PowerBi ingestion, `use_powerbi_email` is now enabled by default when extracting ownership information.
- #12056: The DataHub Airflow plugin no longer supports Airflow 2.1 and Airflow 2.2.
- #12056: The DataHub Airflow plugin now defaults to the v2 plugin implementation.
### Breaking Changes ### Breaking Changes

View File

@ -13,14 +13,14 @@ The DataHub Airflow plugin supports:
- Task run information, including task successes and failures. - Task run information, including task successes and failures.
- Manual lineage annotations using `inlets` and `outlets` on Airflow operators. - Manual lineage annotations using `inlets` and `outlets` on Airflow operators.
There's two actively supported implementations of the plugin, with different Airflow version support. There's two implementations of the plugin, with different Airflow version support.
| Approach | Airflow Version | Notes | | Approach | Airflow Versions | Notes |
| --------- | --------------- | --------------------------------------------------------------------------- | | --------- | ---------------- | --------------------------------------------------------------------------------------- |
| Plugin v2 | 2.3.4+ | Recommended. Requires Python 3.8+ | | Plugin v2 | 2.3.4+ | Recommended. Requires Python 3.8+ |
| Plugin v1 | 2.1 - 2.8 | No automatic lineage extraction; may not extract lineage if the task fails. | | Plugin v1 | 2.3 - 2.8 | Deprecated. No automatic lineage extraction; may not extract lineage if the task fails. |
If you're using Airflow older than 2.1, it's possible to use the v1 plugin with older versions of `acryl-datahub-airflow-plugin`. See the [compatibility section](#compatibility) for more details. If you're using Airflow older than 2.3, it's possible to use the v1 plugin with older versions of `acryl-datahub-airflow-plugin`. See the [compatibility section](#compatibility) for more details.
<!-- TODO: Update the local Airflow guide and link to it here. --> <!-- TODO: Update the local Airflow guide and link to it here. -->
<!-- If you are looking to run Airflow and DataHub using docker locally, follow the guide [here](../../docker/airflow/local_airflow.md). --> <!-- If you are looking to run Airflow and DataHub using docker locally, follow the guide [here](../../docker/airflow/local_airflow.md). -->
@ -29,7 +29,7 @@ If you're using Airflow older than 2.1, it's possible to use the v1 plugin with
### Installation ### Installation
The v2 plugin requires Airflow 2.3+ and Python 3.8+. If you don't meet these requirements, use the v1 plugin instead. The v2 plugin requires Airflow 2.3+ and Python 3.8+. If you don't meet these requirements, see the [compatibility section](#compatibility) for other options.
```shell ```shell
pip install 'acryl-datahub-airflow-plugin[plugin-v2]' pip install 'acryl-datahub-airflow-plugin[plugin-v2]'
@ -84,9 +84,10 @@ enabled = True # default
### Installation ### Installation
The v1 plugin requires Airflow 2.1 - 2.8 and Python 3.8+. If you're on older versions, it's still possible to use an older version of the plugin. See the [compatibility section](#compatibility) for more details. The v1 plugin requires Airflow 2.3 - 2.8 and Python 3.8+. If you're on older versions, it's still possible to use an older version of the plugin. See the [compatibility section](#compatibility) for more details.
If you're using Airflow 2.3+, we recommend using the v2 plugin instead. If you need to use the v1 plugin with Airflow 2.3+, you must also set the environment variable `DATAHUB_AIRFLOW_PLUGIN_USE_V1_PLUGIN=true`. Note that the v1 plugin is less featureful than the v2 plugin, and is overall not actively maintained.
Since datahub v0.15.0, the v2 plugin has been the default. If you need to use the v1 plugin with `acryl-datahub-airflow-plugin` v0.15.0+, you must also set the environment variable `DATAHUB_AIRFLOW_PLUGIN_USE_V1_PLUGIN=true`.
```shell ```shell
pip install 'acryl-datahub-airflow-plugin[plugin-v1]' pip install 'acryl-datahub-airflow-plugin[plugin-v1]'
@ -340,11 +341,12 @@ The solution is to upgrade `acryl-datahub-airflow-plugin>=0.12.0.4` or upgrade `
## Compatibility ## Compatibility
We no longer officially support Airflow <2.1. However, you can use older versions of `acryl-datahub-airflow-plugin` with older versions of Airflow. We no longer officially support Airflow <2.3. However, you can use older versions of `acryl-datahub-airflow-plugin` with older versions of Airflow.
Both of these options support Python 3.7+. The first two options support Python 3.7+, and the last option supports Python 3.8+.
- Airflow 1.10.x, use DataHub plugin v1 with acryl-datahub-airflow-plugin <= 0.9.1.0. - Airflow 1.10.x, use DataHub plugin v1 with acryl-datahub-airflow-plugin <= 0.9.1.0.
- Airflow 2.0.x, use DataHub plugin v1 with acryl-datahub-airflow-plugin <= 0.11.0.1. - Airflow 2.0.x, use DataHub plugin v1 with acryl-datahub-airflow-plugin <= 0.11.0.1.
- Airflow 2.2.x, use DataHub plugin v2 with acryl-datahub-airflow-plugin <= 0.14.1.5.
DataHub also previously supported an Airflow [lineage backend](https://airflow.apache.org/docs/apache-airflow/2.2.0/lineage.html#lineage-backend) implementation. While the implementation is still in our codebase, it is deprecated and will be removed in a future release. DataHub also previously supported an Airflow [lineage backend](https://airflow.apache.org/docs/apache-airflow/2.2.0/lineage.html#lineage-backend) implementation. While the implementation is still in our codebase, it is deprecated and will be removed in a future release.
Note that the lineage backend did not support automatic lineage extraction, did not capture task failures, and did not work in AWS MWAA. Note that the lineage backend did not support automatic lineage extraction, did not capture task failures, and did not work in AWS MWAA.

View File

@ -13,7 +13,7 @@ if (!project.hasProperty("extra_pip_requirements")) {
ext.extra_pip_requirements = "" ext.extra_pip_requirements = ""
} }
if (!project.hasProperty("extra_pip_extras")) { if (!project.hasProperty("extra_pip_extras")) {
ext.extra_pip_extras = "plugin-v2" ext.extra_pip_extras = ""
} }
// If extra_pip_extras is non-empty, we need to add a comma to the beginning of the string. // If extra_pip_extras is non-empty, we need to add a comma to the beginning of the string.
if (extra_pip_extras != "") { if (extra_pip_extras != "") {

View File

@ -24,8 +24,8 @@ _self_pin = (
base_requirements = { base_requirements = {
f"acryl-datahub[datahub-rest]{_self_pin}", f"acryl-datahub[datahub-rest]{_self_pin}",
# Actual dependencies. # We require Airflow 2.3.x, since we need the new DAG listener API.
"apache-airflow >= 2.0.2", "apache-airflow>=2.3.0",
} }
plugins: Dict[str, Set[str]] = { plugins: Dict[str, Set[str]] = {
@ -44,12 +44,13 @@ plugins: Dict[str, Set[str]] = {
# We remain restrictive on the versions allowed here to prevent # We remain restrictive on the versions allowed here to prevent
# us from being broken by backwards-incompatible changes in the # us from being broken by backwards-incompatible changes in the
# underlying package. # underlying package.
"openlineage-airflow>=1.2.0,<=1.22.0", "openlineage-airflow>=1.2.0,<=1.25.0",
}, },
} }
# Include datahub-rest in the base requirements. # Require some plugins by default.
base_requirements.update(plugins["datahub-rest"]) base_requirements.update(plugins["datahub-rest"])
base_requirements.update(plugins["plugin-v2"])
mypy_stubs = { mypy_stubs = {
@ -109,6 +110,11 @@ integration_test_requirements = {
"apache-airflow-providers-sqlite", "apache-airflow-providers-sqlite",
} }
per_version_test_requirements = { per_version_test_requirements = {
"test-airflow23": {
"pendulum<3.0",
"Flask-Session<0.6.0",
"connexion<3.0",
},
"test-airflow24": { "test-airflow24": {
"pendulum<3.0", "pendulum<3.0",
"Flask-Session<0.6.0", "Flask-Session<0.6.0",

View File

@ -46,7 +46,7 @@ def get_task_inlets(operator: "Operator") -> List:
return operator._inlets # type: ignore[attr-defined, union-attr] return operator._inlets # type: ignore[attr-defined, union-attr]
if hasattr(operator, "get_inlet_defs"): if hasattr(operator, "get_inlet_defs"):
return operator.get_inlet_defs() # type: ignore[attr-defined] return operator.get_inlet_defs() # type: ignore[attr-defined]
return operator.inlets return operator.inlets or []
def get_task_outlets(operator: "Operator") -> List: def get_task_outlets(operator: "Operator") -> List:
@ -56,7 +56,7 @@ def get_task_outlets(operator: "Operator") -> List:
return operator._outlets # type: ignore[attr-defined, union-attr] return operator._outlets # type: ignore[attr-defined, union-attr]
if hasattr(operator, "get_outlet_defs"): if hasattr(operator, "get_outlet_defs"):
return operator.get_outlet_defs() return operator.get_outlet_defs()
return operator.outlets return operator.outlets or []
__all__ = [ __all__ = [

View File

@ -74,7 +74,7 @@ _RUN_IN_THREAD = os.getenv("DATAHUB_AIRFLOW_PLUGIN_RUN_IN_THREAD", "true").lower
"1", "1",
) )
_RUN_IN_THREAD_TIMEOUT = float( _RUN_IN_THREAD_TIMEOUT = float(
os.getenv("DATAHUB_AIRFLOW_PLUGIN_RUN_IN_THREAD_TIMEOUT", 15) os.getenv("DATAHUB_AIRFLOW_PLUGIN_RUN_IN_THREAD_TIMEOUT", 10)
) )
_DATAHUB_CLEANUP_DAG = "Datahub_Cleanup" _DATAHUB_CLEANUP_DAG = "Datahub_Cleanup"
@ -102,6 +102,7 @@ def get_airflow_plugin_listener() -> Optional["DataHubListener"]:
"capture_tags": plugin_config.capture_tags_info, "capture_tags": plugin_config.capture_tags_info,
"capture_ownership": plugin_config.capture_ownership_info, "capture_ownership": plugin_config.capture_ownership_info,
"enable_extractors": plugin_config.enable_extractors, "enable_extractors": plugin_config.enable_extractors,
"render_templates": plugin_config.render_templates,
"disable_openlineage_plugin": plugin_config.disable_openlineage_plugin, "disable_openlineage_plugin": plugin_config.disable_openlineage_plugin,
}, },
) )

View File

@ -14,7 +14,7 @@
"fileloc": "<fileloc>", "fileloc": "<fileloc>",
"is_paused_upon_creation": "None", "is_paused_upon_creation": "None",
"start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))", "start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))",
"tags": "None", "tags": "[]",
"timezone": "Timezone('UTC')" "timezone": "Timezone('UTC')"
}, },
"externalUrl": "http://airflow.example.com/tree?dag_id=basic_iolets", "externalUrl": "http://airflow.example.com/tree?dag_id=basic_iolets",
@ -83,7 +83,7 @@
"execution_timeout": "None", "execution_timeout": "None",
"sla": "None", "sla": "None",
"task_id": "'run_data_task'", "task_id": "'run_data_task'",
"trigger_rule": "'all_success'", "trigger_rule": "<TriggerRule.ALL_SUCCESS: 'all_success'>",
"wait_for_downstream": "False", "wait_for_downstream": "False",
"downstream_task_ids": "[]", "downstream_task_ids": "[]",
"inlets": "[]", "inlets": "[]",
@ -246,6 +246,46 @@
} }
} }
}, },
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,basic_iolets,prod),run_data_task)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {
"depends_on_past": "False",
"email": "None",
"label": "'run_data_task'",
"execution_timeout": "None",
"sla": "None",
"task_id": "'run_data_task'",
"trigger_rule": "<TriggerRule.ALL_SUCCESS: 'all_success'>",
"wait_for_downstream": "False",
"downstream_task_ids": "[]",
"inlets": "[]",
"outlets": "[]"
},
"externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=basic_iolets&_flt_3_task_id=run_data_task",
"name": "run_data_task",
"type": {
"string": "COMMAND"
},
"env": "PROD"
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,basic_iolets,prod),run_data_task)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{ {
"entityType": "dataJob", "entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,basic_iolets,prod),run_data_task)", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,basic_iolets,prod),run_data_task)",
@ -402,16 +442,16 @@
"state": "success", "state": "success",
"operator": "BashOperator", "operator": "BashOperator",
"priority_weight": "1", "priority_weight": "1",
"log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_data_task&dag_id=basic_iolets", "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_data_task&dag_id=basic_iolets&map_index=-1",
"orchestrator": "airflow", "orchestrator": "airflow",
"dag_id": "basic_iolets", "dag_id": "basic_iolets",
"task_id": "run_data_task" "task_id": "run_data_task"
}, },
"externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_data_task&dag_id=basic_iolets", "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_data_task&dag_id=basic_iolets&map_index=-1",
"name": "basic_iolets_run_data_task_manual_run_test", "name": "basic_iolets_run_data_task_manual_run_test",
"type": "BATCH_AD_HOC", "type": "BATCH_AD_HOC",
"created": { "created": {
"time": 1717180290951, "time": 1733529136396,
"actor": "urn:li:corpuser:datahub" "actor": "urn:li:corpuser:datahub"
} }
} }
@ -544,7 +584,7 @@
"aspectName": "dataProcessInstanceRunEvent", "aspectName": "dataProcessInstanceRunEvent",
"aspect": { "aspect": {
"json": { "json": {
"timestampMillis": 1717180290951, "timestampMillis": 1733529136396,
"partitionSpec": { "partitionSpec": {
"partition": "FULL_TABLE_SNAPSHOT", "partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE" "type": "FULL_TABLE"
@ -561,7 +601,7 @@
"aspectName": "dataProcessInstanceRunEvent", "aspectName": "dataProcessInstanceRunEvent",
"aspect": { "aspect": {
"json": { "json": {
"timestampMillis": 1717180291140, "timestampMillis": 1733529137385,
"partitionSpec": { "partitionSpec": {
"partition": "FULL_TABLE_SNAPSHOT", "partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE" "type": "FULL_TABLE"

View File

@ -14,7 +14,7 @@
"fileloc": "<fileloc>", "fileloc": "<fileloc>",
"is_paused_upon_creation": "None", "is_paused_upon_creation": "None",
"start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))", "start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))",
"tags": "None", "tags": "[]",
"timezone": "Timezone('UTC')" "timezone": "Timezone('UTC')"
}, },
"externalUrl": "http://airflow.example.com/tree?dag_id=simple_dag", "externalUrl": "http://airflow.example.com/tree?dag_id=simple_dag",
@ -84,7 +84,7 @@
"execution_timeout": "None", "execution_timeout": "None",
"sla": "None", "sla": "None",
"task_id": "'task_1'", "task_id": "'task_1'",
"trigger_rule": "'all_success'", "trigger_rule": "<TriggerRule.ALL_SUCCESS: 'all_success'>",
"wait_for_downstream": "False", "wait_for_downstream": "False",
"downstream_task_ids": "['run_another_data_task']", "downstream_task_ids": "['run_another_data_task']",
"inlets": "[]", "inlets": "[]",
@ -205,6 +205,46 @@
} }
} }
}, },
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),task_1)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {
"depends_on_past": "False",
"email": "None",
"label": "'task_1'",
"execution_timeout": "None",
"sla": "None",
"task_id": "'task_1'",
"trigger_rule": "<TriggerRule.ALL_SUCCESS: 'all_success'>",
"wait_for_downstream": "False",
"downstream_task_ids": "['run_another_data_task']",
"inlets": "[]",
"outlets": "[]"
},
"externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=simple_dag&_flt_3_task_id=task_1",
"name": "task_1",
"type": {
"string": "COMMAND"
},
"env": "PROD"
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),task_1)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{ {
"entityType": "dataJob", "entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),task_1)", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),task_1)",
@ -319,16 +359,16 @@
"state": "success", "state": "success",
"operator": "BashOperator", "operator": "BashOperator",
"priority_weight": "2", "priority_weight": "2",
"log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=task_1&dag_id=simple_dag", "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=task_1&dag_id=simple_dag&map_index=-1",
"orchestrator": "airflow", "orchestrator": "airflow",
"dag_id": "simple_dag", "dag_id": "simple_dag",
"task_id": "task_1" "task_id": "task_1"
}, },
"externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=task_1&dag_id=simple_dag", "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=task_1&dag_id=simple_dag&map_index=-1",
"name": "simple_dag_task_1_manual_run_test", "name": "simple_dag_task_1_manual_run_test",
"type": "BATCH_AD_HOC", "type": "BATCH_AD_HOC",
"created": { "created": {
"time": 1717180227827, "time": 1733528983395,
"actor": "urn:li:corpuser:datahub" "actor": "urn:li:corpuser:datahub"
} }
} }
@ -419,7 +459,7 @@
"aspectName": "dataProcessInstanceRunEvent", "aspectName": "dataProcessInstanceRunEvent",
"aspect": { "aspect": {
"json": { "json": {
"timestampMillis": 1717180227827, "timestampMillis": 1733528983395,
"partitionSpec": { "partitionSpec": {
"partition": "FULL_TABLE_SNAPSHOT", "partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE" "type": "FULL_TABLE"
@ -436,7 +476,7 @@
"aspectName": "dataProcessInstanceRunEvent", "aspectName": "dataProcessInstanceRunEvent",
"aspect": { "aspect": {
"json": { "json": {
"timestampMillis": 1717180228022, "timestampMillis": 1733528984355,
"partitionSpec": { "partitionSpec": {
"partition": "FULL_TABLE_SNAPSHOT", "partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE" "type": "FULL_TABLE"
@ -449,6 +489,42 @@
} }
} }
}, },
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,simple_dag,prod)",
"changeType": "UPSERT",
"aspectName": "dataFlowInfo",
"aspect": {
"json": {
"customProperties": {
"_access_control": "None",
"catchup": "False",
"description": "'A simple DAG that runs a few fake data tasks.'",
"doc_md": "None",
"fileloc": "<fileloc>",
"is_paused_upon_creation": "None",
"start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))",
"tags": "[]",
"timezone": "Timezone('UTC')"
},
"externalUrl": "http://airflow.example.com/tree?dag_id=simple_dag",
"name": "simple_dag",
"description": "A simple DAG that runs a few fake data tasks.",
"env": "PROD"
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,simple_dag,prod)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{ {
"entityType": "dataFlow", "entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,simple_dag,prod)", "entityUrn": "urn:li:dataFlow:(airflow,simple_dag,prod)",
@ -498,7 +574,7 @@
"execution_timeout": "None", "execution_timeout": "None",
"sla": "None", "sla": "None",
"task_id": "'run_another_data_task'", "task_id": "'run_another_data_task'",
"trigger_rule": "'all_success'", "trigger_rule": "<TriggerRule.ALL_SUCCESS: 'all_success'>",
"wait_for_downstream": "False", "wait_for_downstream": "False",
"downstream_task_ids": "[]", "downstream_task_ids": "[]",
"inlets": "[]", "inlets": "[]",
@ -575,6 +651,46 @@
} }
} }
}, },
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),run_another_data_task)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {
"depends_on_past": "False",
"email": "None",
"label": "'run_another_data_task'",
"execution_timeout": "None",
"sla": "None",
"task_id": "'run_another_data_task'",
"trigger_rule": "<TriggerRule.ALL_SUCCESS: 'all_success'>",
"wait_for_downstream": "False",
"downstream_task_ids": "[]",
"inlets": "[]",
"outlets": "[]"
},
"externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=simple_dag&_flt_3_task_id=run_another_data_task",
"name": "run_another_data_task",
"type": {
"string": "COMMAND"
},
"env": "PROD"
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),run_another_data_task)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{ {
"entityType": "dataJob", "entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),run_another_data_task)", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),run_another_data_task)",
@ -645,16 +761,16 @@
"state": "success", "state": "success",
"operator": "BashOperator", "operator": "BashOperator",
"priority_weight": "1", "priority_weight": "1",
"log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_another_data_task&dag_id=simple_dag", "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_another_data_task&dag_id=simple_dag&map_index=-1",
"orchestrator": "airflow", "orchestrator": "airflow",
"dag_id": "simple_dag", "dag_id": "simple_dag",
"task_id": "run_another_data_task" "task_id": "run_another_data_task"
}, },
"externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_another_data_task&dag_id=simple_dag", "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_another_data_task&dag_id=simple_dag&map_index=-1",
"name": "simple_dag_run_another_data_task_manual_run_test", "name": "simple_dag_run_another_data_task_manual_run_test",
"type": "BATCH_AD_HOC", "type": "BATCH_AD_HOC",
"created": { "created": {
"time": 1717180231676, "time": 1733528992448,
"actor": "urn:li:corpuser:datahub" "actor": "urn:li:corpuser:datahub"
} }
} }
@ -679,7 +795,7 @@
"aspectName": "dataProcessInstanceRunEvent", "aspectName": "dataProcessInstanceRunEvent",
"aspect": { "aspect": {
"json": { "json": {
"timestampMillis": 1717180231676, "timestampMillis": 1733528992448,
"partitionSpec": { "partitionSpec": {
"partition": "FULL_TABLE_SNAPSHOT", "partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE" "type": "FULL_TABLE"
@ -696,7 +812,7 @@
"aspectName": "dataProcessInstanceRunEvent", "aspectName": "dataProcessInstanceRunEvent",
"aspect": { "aspect": {
"json": { "json": {
"timestampMillis": 1717180231824, "timestampMillis": 1733528993380,
"partitionSpec": { "partitionSpec": {
"partition": "FULL_TABLE_SNAPSHOT", "partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE" "type": "FULL_TABLE"

View File

@ -12,6 +12,7 @@ import textwrap
import time import time
from typing import Any, Iterator, Sequence from typing import Any, Iterator, Sequence
import packaging.version
import pytest import pytest
import requests import requests
import tenacity import tenacity
@ -20,6 +21,7 @@ from datahub.ingestion.sink.file import write_metadata_file
from datahub.testing.compare_metadata_json import assert_metadata_files_equal from datahub.testing.compare_metadata_json import assert_metadata_files_equal
from datahub_airflow_plugin._airflow_shims import ( from datahub_airflow_plugin._airflow_shims import (
AIRFLOW_VERSION,
HAS_AIRFLOW_DAG_LISTENER_API, HAS_AIRFLOW_DAG_LISTENER_API,
HAS_AIRFLOW_LISTENER_API, HAS_AIRFLOW_LISTENER_API,
HAS_AIRFLOW_STANDALONE_CMD, HAS_AIRFLOW_STANDALONE_CMD,
@ -242,6 +244,7 @@ def _run_airflow(
# Note that we could also disable the RUN_IN_THREAD entirely, # Note that we could also disable the RUN_IN_THREAD entirely,
# but I want to minimize the difference between CI and prod. # but I want to minimize the difference between CI and prod.
"DATAHUB_AIRFLOW_PLUGIN_RUN_IN_THREAD_TIMEOUT": "30", "DATAHUB_AIRFLOW_PLUGIN_RUN_IN_THREAD_TIMEOUT": "30",
"DATAHUB_AIRFLOW_PLUGIN_USE_V1_PLUGIN": "true" if is_v1 else "false",
# Convenience settings. # Convenience settings.
"AIRFLOW__DATAHUB__LOG_LEVEL": "DEBUG", "AIRFLOW__DATAHUB__LOG_LEVEL": "DEBUG",
"AIRFLOW__DATAHUB__DEBUG_EMITTER": "True", "AIRFLOW__DATAHUB__DEBUG_EMITTER": "True",
@ -361,7 +364,6 @@ test_cases = [
@pytest.mark.parametrize( @pytest.mark.parametrize(
["golden_filename", "test_case", "is_v1"], ["golden_filename", "test_case", "is_v1"],
[ [
# On Airflow <= 2.2, test plugin v1.
*[ *[
pytest.param( pytest.param(
f"v1_{test_case.dag_id}", f"v1_{test_case.dag_id}",
@ -369,8 +371,8 @@ test_cases = [
True, True,
id=f"v1_{test_case.dag_id}", id=f"v1_{test_case.dag_id}",
marks=pytest.mark.skipif( marks=pytest.mark.skipif(
HAS_AIRFLOW_LISTENER_API, AIRFLOW_VERSION >= packaging.version.parse("2.4.0"),
reason="Not testing plugin v1 on newer Airflow versions", reason="We only test the v1 plugin on Airflow 2.3",
), ),
) )
for test_case in test_cases for test_case in test_cases
@ -391,10 +393,18 @@ test_cases = [
if HAS_AIRFLOW_DAG_LISTENER_API if HAS_AIRFLOW_DAG_LISTENER_API
else f"v2_{test_case.dag_id}_no_dag_listener" else f"v2_{test_case.dag_id}_no_dag_listener"
), ),
marks=pytest.mark.skipif( marks=[
not HAS_AIRFLOW_LISTENER_API, pytest.mark.skipif(
reason="Cannot test plugin v2 without the Airflow plugin listener API", not HAS_AIRFLOW_LISTENER_API,
), reason="Cannot test plugin v2 without the Airflow plugin listener API",
),
pytest.mark.skipif(
AIRFLOW_VERSION < packaging.version.parse("2.4.0"),
reason="We skip testing the v2 plugin on Airflow 2.3 because it causes flakiness in the custom properties. "
"Ideally we'd just fix these, but given that Airflow 2.3 is EOL and likely going to be deprecated "
"soon anyways, it's not worth the effort.",
),
],
) )
for test_case in test_cases for test_case in test_cases
], ],

View File

@ -4,17 +4,24 @@
# and then run "tox" from this directory. # and then run "tox" from this directory.
[tox] [tox]
envlist = py38-airflow21, py38-airflow22, py310-airflow24, py310-airflow26, py310-airflow27, py310-airflow28, py311-airflow29, py311-airflow210 envlist = py38-airflow23, py310-airflow24, py310-airflow26, py310-airflow27, py310-airflow28, py311-airflow29, py311-airflow210
[testenv] [testenv]
use_develop = true use_develop = true
extras = dev,integration-tests,plugin-v1 extras =
dev
integration-tests
plugin-v1
plugin-v2
# For Airflow 2.3 and 2.4, add a few extra requirements.
airflow23: test-airflow23
airflow24: test-airflow24
deps = deps =
# This should be kept in sync with the Github Actions matrix. # This should be kept in sync with the Github Actions matrix.
-e ../../metadata-ingestion/ -e ../../metadata-ingestion/
# Airflow version # Airflow version
airflow21: apache-airflow~=2.1.0 airflow23: apache-airflow~=2.3.0
airflow22: apache-airflow~=2.2.0
airflow24: apache-airflow~=2.4.0 airflow24: apache-airflow~=2.4.0
airflow26: apache-airflow~=2.6.0 airflow26: apache-airflow~=2.6.0
airflow27: apache-airflow~=2.7.0 airflow27: apache-airflow~=2.7.0
@ -23,7 +30,8 @@ deps =
airflow210: apache-airflow~=2.10.0 airflow210: apache-airflow~=2.10.0
# Respect the Airflow constraints files. # Respect the Airflow constraints files.
# We can't make ourselves work with the constraints of Airflow < 2.3. # We can't make ourselves work with the constraints of Airflow <= 2.3.
; py38-airflow23: -c https://raw.githubusercontent.com/apache/airflow/constraints-2.3.4/constraints-3.8.txt
# The Airflow 2.4 constraints file requires a version of the sqlite provider whose # The Airflow 2.4 constraints file requires a version of the sqlite provider whose
# hook type is missing the `conn_name_attr` property. # hook type is missing the `conn_name_attr` property.
; py310-airflow24: -c https://raw.githubusercontent.com/apache/airflow/constraints-2.4.3/constraints-3.10.txt ; py310-airflow24: -c https://raw.githubusercontent.com/apache/airflow/constraints-2.4.3/constraints-3.10.txt
@ -31,7 +39,7 @@ deps =
py310-airflow27: -c https://raw.githubusercontent.com/apache/airflow/constraints-2.7.3/constraints-3.10.txt py310-airflow27: -c https://raw.githubusercontent.com/apache/airflow/constraints-2.7.3/constraints-3.10.txt
py310-airflow28: -c https://raw.githubusercontent.com/apache/airflow/constraints-2.8.1/constraints-3.10.txt py310-airflow28: -c https://raw.githubusercontent.com/apache/airflow/constraints-2.8.1/constraints-3.10.txt
py311-airflow29: -c https://raw.githubusercontent.com/apache/airflow/constraints-2.9.3/constraints-3.11.txt py311-airflow29: -c https://raw.githubusercontent.com/apache/airflow/constraints-2.9.3/constraints-3.11.txt
py311-airflow210: -c https://raw.githubusercontent.com/apache/airflow/constraints-2.10.2/constraints-3.11.txt py311-airflow210: -c https://raw.githubusercontent.com/apache/airflow/constraints-2.10.3/constraints-3.11.txt
# Before pinning to the constraint files, we previously left the dependencies # Before pinning to the constraint files, we previously left the dependencies
# more open. There were a number of packages for which this caused issues. # more open. There were a number of packages for which this caused issues.
@ -54,11 +62,3 @@ deps =
; airflow24,airflow26,airflow27,airflow28: Flask-Session<0.6.0 ; airflow24,airflow26,airflow27,airflow28: Flask-Session<0.6.0
commands = commands =
pytest --cov-append {posargs} pytest --cov-append {posargs}
# For Airflow 2.4+, add the plugin-v2 extra.
[testenv:py310-airflow24]
extras = dev,integration-tests,plugin-v2,test-airflow24
[testenv:py3{10,11}-airflow{26,27,28,29,210}]
extras = dev,integration-tests,plugin-v2