feat(ingest): drop plugin support for airflow 1.x (#6331)

This commit is contained in:
Harshal Sheth 2022-11-01 21:12:34 -07:00 committed by GitHub
parent 0a0cf70b6f
commit b4687ffceb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 36 additions and 355 deletions

View File

@ -20,44 +20,20 @@ concurrency:
cancel-in-progress: true
jobs:
metadata-ingestion-general:
runs-on: ubuntu-latest
env:
SPARK_VERSION: 3.0.3
DATAHUB_TELEMETRY_ENABLED: false
strategy:
matrix:
python-version: ["3.7", "3.10.6"]
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: ./metadata-ingestion/scripts/install_deps.sh
- name: Run metadata-ingestion tests
run: ./gradlew :metadata-ingestion:build :metadata-ingestion:testQuick :metadata-ingestion:check
- uses: actions/upload-artifact@v3
if: always()
with:
name: Test Results (metadata ingestion ${{ matrix.python-version }} testQuick)
path: |
**/build/reports/tests/test/**
**/build/test-results/test/**
**/junit.*.xml
metadata-ingestion:
runs-on: ubuntu-latest
env:
SPARK_VERSION: 3.0.3
DATAHUB_TELEMETRY_ENABLED: false
DATAHUB_LOOKML_GIT_TEST_SSH_KEY: ${{ secrets.DATAHUB_LOOKML_GIT_TEST_SSH_KEY }}
# TODO: Enable this once the test is fixed.
# DATAHUB_LOOKML_GIT_TEST_SSH_KEY: ${{ secrets.DATAHUB_LOOKML_GIT_TEST_SSH_KEY }}
strategy:
matrix:
python-version: ["3.7", "3.10"]
command:
[
"installAirflow1",
"lint",
"testQuick",
"testIntegration",
"testIntegrationBatch1",
"testSlowIntegration",
@ -75,12 +51,12 @@ jobs:
- name: Install dependencies
run: ./metadata-ingestion/scripts/install_deps.sh
- name: Run metadata-ingestion tests
run: ./gradlew :metadata-ingestion:build :metadata-ingestion:${{ matrix.command }} -x:metadata-ingestion:testQuick -x:metadata-ingestion:check
run: ./gradlew :metadata-ingestion:build :metadata-ingestion:${{ matrix.command }}
- name: pip freeze show list installed
if: always()
run: source metadata-ingestion/venv/bin/activate && pip freeze
- uses: actions/upload-artifact@v3
if: always()
if: ${{ always() && matrix.command != 'lint' }}
with:
name: Test Results (metadata ingestion ${{ matrix.python-version }})
path: |

View File

@ -6,6 +6,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
- LookML source will only emit views that are reachable from explores while scanning your git repo. Previous behavior can be achieved by setting `emit_reachable_views_only` to False.
- LookML source will always lowercase urns for lineage edges from views to upstream tables. There is no fallback provided to previous behavior because it was inconsistent in application of lower-casing earlier.
- dbt config `node_type_pattern` which was previously deprecated has been removed. Use `entities_enabled` instead to control whether to emit metadata for sources, models, seeds, tests, etc.
- The DataHub Airflow lineage backend and plugin no longer support Airflow 1.x. You can still run DataHub ingestion in Airflow 1.x using the [PythonVirtualenvOperator](https://airflow.apache.org/docs/apache-airflow/1.10.15/_api/airflow/operators/python_operator/index.html?highlight=pythonvirtualenvoperator#airflow.operators.python_operator.PythonVirtualenvOperator).
### Breaking Changes
- Java version 11 or greater is required.

View File

@ -1,13 +1,9 @@
import os
import pathlib
import sys
from typing import Dict, Set
import setuptools
is_py37_or_newer = sys.version_info >= (3, 7)
package_metadata: dict = {}
with open("./src/datahub_airflow_plugin/__init__.py") as fp:
exec(fp.read(), package_metadata)
@ -26,7 +22,7 @@ base_requirements = {
# Actual dependencies.
"typing-inspect",
"pydantic>=1.5.1",
"apache-airflow >= 1.10.2",
"apache-airflow >= 2.0.2",
"acryl-datahub[airflow] >= 0.8.36",
# Pinned dependencies to make dependency resolution faster.
"sqlalchemy==1.3.24",
@ -77,20 +73,10 @@ base_dev_requirements = {
"packaging",
}
base_dev_requirements_airflow_1 = base_dev_requirements.copy()
dev_requirements = {
*base_dev_requirements,
}
dev_requirements_airflow_1_base = {
"apache-airflow==1.10.15",
"apache-airflow-backport-providers-snowflake",
}
dev_requirements_airflow_1 = {
*base_dev_requirements_airflow_1,
*dev_requirements_airflow_1_base,
}
entry_points = {
"airflow.plugins": "acryl-datahub-airflow-plugin = datahub_airflow_plugin.datahub_plugin:DatahubPlugin"
@ -119,6 +105,7 @@ setuptools.setup(
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Intended Audience :: Developers",
"Intended Audience :: Information Technology",
"Intended Audience :: System Administrators",
@ -140,7 +127,5 @@ setuptools.setup(
install_requires=list(base_requirements),
extras_require={
"dev": list(dev_requirements),
"dev-airflow1-base": list(dev_requirements_airflow_1_base),
"dev-airflow1": list(dev_requirements_airflow_1),
},
)

View File

@ -11,7 +11,7 @@ from airflow.utils.module_loading import import_string
from cattr import structure
from datahub.api.entities.dataprocess.dataprocess_instance import InstanceRunResult
from datahub_provider.client.airflow_generator import AirflowGenerator
from datahub_provider.hooks.datahub import AIRFLOW_1, DatahubGenericHook
from datahub_provider.hooks.datahub import DatahubGenericHook
from datahub_provider.lineage.datahub import DatahubLineageConfig
@ -40,38 +40,6 @@ def get_lineage_config() -> DatahubLineageConfig:
def get_inlets_from_task(task: BaseOperator, context: Any) -> Iterable[Any]:
inlets = []
needs_repeat_preparation = False
if (
not AIRFLOW_1
and isinstance(task._inlets, list)
and len(task._inlets) == 1
and isinstance(task._inlets[0], dict)
):
# This is necessary to avoid issues with circular imports.
from airflow.lineage import AUTO, prepare_lineage
task._inlets = [
# See https://airflow.apache.org/docs/apache-airflow/1.10.15/lineage.html.
*task._inlets[0].get("datasets", []), # assumes these are attr-annotated
*task._inlets[0].get("task_ids", []),
*([AUTO] if task._inlets[0].get("auto", False) else []),
]
needs_repeat_preparation = True
if (
not AIRFLOW_1
and isinstance(task._outlets, list)
and len(task._outlets) == 1
and isinstance(task._outlets[0], dict)
):
task._outlets = [*task._outlets[0].get("datasets", [])]
needs_repeat_preparation = True
if needs_repeat_preparation:
# Rerun the lineage preparation routine, now that the old format has been translated to the new one.
prepare_lineage(lambda self, ctx: None)(task, context)
context = context or {} # ensure not None to satisfy mypy
if isinstance(task._inlets, (str, BaseOperator)) or attr.has(task._inlets): # type: ignore
inlets = [
task._inlets,
@ -370,27 +338,13 @@ def _wrap_task_policy(policy):
return custom_task_policy
def set_airflow2_policies(settings):
def _patch_policy(settings):
print("Patching datahub policy")
if hasattr(settings, "task_policy"):
datahub_task_policy = _wrap_task_policy(settings.task_policy)
settings.task_policy = datahub_task_policy
def set_airflow1_policies(settings):
if hasattr(settings, "policy"):
datahub_task_policy = _wrap_task_policy(settings.policy)
settings.policy = datahub_task_policy
def _patch_policy(settings):
if AIRFLOW_1:
print("Patching datahub policy for Airflow 1")
set_airflow1_policies(settings)
else:
print("Patching datahub policy for Airflow 2")
set_airflow2_policies(settings)
def _patch_datahub_policy():
with contextlib.suppress(ImportError):
import airflow_local_settings

View File

@ -4,12 +4,12 @@
# and then run "tox" from this directory.
[tox]
envlist = py3-quick,py3-full,py3-airflow1
envlist = py3-quick,py3-full
[gh-actions]
python =
3.6: py3-full, py3-airflow1
3.9: py3-full, py3-airflow1
3.6: py3-full
3.9: py3-full
# Providing optional features that add dependencies from setup.py as deps here
# allows tox to recreate testenv when new dependencies are added to setup.py.
@ -22,7 +22,7 @@ deps =
-e ../../metadata-ingestion/[.dev]
commands =
pytest --cov={envsitepackagesdir}/datahub --cov={envsitepackagesdir}/datahub_provider \
py3-quick,py3-airflow1: -m 'not integration and not slow_integration' --junit-xml=junit.quick.xml \
py3-quick: -m 'not integration and not slow_integration' --junit-xml=junit.quick.xml \
py3-full: --cov-fail-under 65 --junit-xml=junit.full.xml \
--continue-on-collection-errors \
-vv
@ -30,14 +30,6 @@ commands =
setenv =
AIRFLOW_HOME = /tmp/airflow/thisshouldnotexist-{envname}
[testenv:py3-airflow1]
deps =
../../metadata-ingestion/[.dev]
-c ../../metadata-ingestion/tests/airflow1-constraints.txt
setenv =
AIRFLOW1_TEST = true
[testenv:py3-full]
deps =
../../metadata-ingestion/.[dev]

View File

@ -109,14 +109,6 @@ task testSingle(dependsOn: [installDevTest]) {
}
}
task installAirflow1(type: Exec, dependsOn: [install]) {
inputs.file file('setup.py')
outputs.dir("${venv_name}")
outputs.file("${venv_name}/.build_install_airflow_sentinel")
commandLine 'bash', '-x', '-c',
"${venv_name}/bin/pip install -e .[dev-airflow1] -c tests/airflow1-constraints.txt && touch ${venv_name}/.build_install_airflow_sentinel"
}
task testIntegration(type: Exec, dependsOn: [installDevTest]) {
commandLine 'bash', '-c',
"source ${venv_name}/bin/activate && pytest --durations=50 -m 'integration' -vv --continue-on-collection-errors --junit-xml=junit.integration.xml"

View File

@ -1,38 +0,0 @@
#!/bin/bash
set -euo pipefail
HEADER=$(cat <<-EOF
#
# This file helps pip resolve dependencies for Airflow 1.x in a reasonable amount
# of time during testing. Without these constraints, pip will spend hours
# backtracking in an attempt to find a compatible list of versions.
# See https://pip.pypa.io/en/latest/topics/dependency-resolution/#backtracking
# for some explanation of backtracing with the new behavior in pip 20.3+.
#
EOF
)
# Setup a clean virtualenv and install dev deps.
../gradlew clean installDev
# Save a copy of the pip environment.
pip freeze > requirements-dev.txt
# Install Airflow 1.10.15. This will automatically uninstall all incompatible dependencies versions
# and replace them with compatible ones. One minor snag: we need to manually remove the Airflow
# 2.x providers that were split into separate packages, since pip won't remove those automatically.
pip uninstall -y apache-airflow-providers-http apache-airflow-providers-snowflake
pip install -e '.[dev-airflow1-base]'
# Save another copy of the pip environment.
pip freeze > requirements-dev-airflow1.txt
# Add updated dependencies to the constraints file.
# This gets all lines in dev-airflow1.txt that are not in dev.txt.
comm -23 requirements-dev-airflow1.txt requirements-dev.txt > airflow1-constraints-data.txt
# Add a timestamp and comment header to the top of the file.
(echo "# Generated by scripts/airflow1-constraints.sh on $(date)." && echo "$HEADER" && cat airflow1-constraints-data.txt) > tests/airflow1-constraints.txt
# Cleanup.
mv requirements-dev.txt requirements-dev-airflow1.txt airflow1-constraints-data.txt /tmp

View File

@ -28,7 +28,7 @@ base_requirements = {
}
framework_common = {
"click>=6.0.0",
"click>=7.1.2",
"click-default-group",
"PyYAML",
"toml>=0.10.0",
@ -43,11 +43,9 @@ framework_common = {
"tabulate",
"progressbar2",
"termcolor>=1.0.0",
"types-termcolor>=1.0.0",
"psutil>=5.8.0",
"ratelimiter",
"Deprecated",
"types-Deprecated",
"humanfriendly",
"packaging",
"aiohttp<4",
@ -99,7 +97,6 @@ kafka_protobuf = {
# Required to generate protobuf python modules from the schema downloaded from the schema registry
"grpcio==1.44.0",
"grpcio-tools==1.44.0",
"types-protobuf",
}
sql_common = {
@ -218,7 +215,7 @@ plugins: Dict[str, Set[str]] = {
"datahub-rest": {"requests"},
# Integrations.
"airflow": {
"apache-airflow >= 1.10.2",
"apache-airflow >= 2.0.2",
},
"circuit-breaker": {
"gql>=3.3.0",
@ -233,12 +230,13 @@ plugins: Dict[str, Set[str]] = {
| bigquery_common
| {"sqlalchemy-bigquery>=1.4.1", "sqllineage==1.3.6", "sqlparse"},
"bigquery-usage-legacy": bigquery_common | usage_common | {"cachetools"},
"bigquery": sql_common
| bigquery_common
| {"sqllineage==1.3.6", "sql_metadata"},
"bigquery": sql_common | bigquery_common | {"sqllineage==1.3.6", "sql_metadata"},
"bigquery-beta": sql_common
| bigquery_common
| {"sqllineage==1.3.6", "sql_metadata"}, # deprecated, but keeping the extra for backwards compatibility
| {
"sqllineage==1.3.6",
"sql_metadata",
}, # deprecated, but keeping the extra for backwards compatibility
"clickhouse": sql_common | {"clickhouse-sqlalchemy==0.1.8"},
"clickhouse-usage": sql_common
| usage_common
@ -358,6 +356,9 @@ mypy_stubs = {
"types-pyOpenSSL",
"types-click-spinner",
"types-ujson>=5.2.0",
"types-termcolor>=1.0.0",
"types-Deprecated",
"types-protobuf",
}
base_dev_requirements = {
@ -396,6 +397,7 @@ base_dev_requirements = {
"delta-lake",
"druid",
"elasticsearch",
"feast",
"iceberg",
"ldap",
"looker",
@ -427,35 +429,11 @@ base_dev_requirements = {
),
}
base_dev_requirements_airflow_1 = base_dev_requirements.copy()
base_dev_requirements = base_dev_requirements.union(
# The feast plugin is not compatible with Airflow 1, so we add it later.
{
dependency
for plugin in [
"feast",
]
for dependency in plugins[plugin]
}
)
dev_requirements = {
*base_dev_requirements,
"apache-airflow[snowflake]>=2.0.2", # snowflake is used in example dags
"snowflake-sqlalchemy<=1.2.4", # make constraint consistent with extras
}
dev_requirements_airflow_1_base = {
"apache-airflow==1.10.15",
"apache-airflow-backport-providers-snowflake",
"snowflake-sqlalchemy<=1.2.4", # make constraint consistent with extras
"WTForms==2.3.3", # make constraint consistent with extras
}
dev_requirements_airflow_1 = {
*base_dev_requirements_airflow_1,
*dev_requirements_airflow_1_base,
}
full_test_dev_requirements = {
*list(
@ -627,8 +605,6 @@ setuptools.setup(
)
),
"dev": list(dev_requirements),
"dev-airflow1-base": list(dev_requirements_airflow_1_base),
"dev-airflow1": list(dev_requirements_airflow_1),
"integration-tests": list(full_test_dev_requirements),
},
)

View File

@ -11,14 +11,6 @@ class _Entity:
def urn(self) -> str:
pass
def set_context(self, context):
# Required for compat with Airflow 1.10.x
pass
def as_dict(self):
# Required for compat with Airflow 1.10.x
return attr.asdict(self)
@attr.s(auto_attribs=True, str=True)
class Dataset(_Entity):

View File

@ -7,12 +7,7 @@ DataHub ingestion pipeline within an Airflow DAG.
from datetime import timedelta
from airflow import DAG
try:
from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datahub.configuration.config_loader import load_config_file

View File

@ -6,13 +6,9 @@ An example DAG demonstrating the usage of DataHub's Airflow lineage backend.
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
try:
from airflow.operators.bash import BashOperator
except ModuleNotFoundError:
from airflow.operators.bash_operator import BashOperator
from datahub_provider.entities import Dataset
default_args = {

View File

@ -1,7 +1,6 @@
"""Lineage Backend
An example DAG demonstrating the usage of DataHub's Airflow lineage backend using the TaskFlow API.
This example only works with Airflow 2.x. See https://airflow.apache.org/docs/apache-airflow/stable/concepts/taskflow.html.
"""
from datetime import timedelta

View File

@ -8,13 +8,9 @@ embedded within the code.
from datetime import timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
try:
from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
from airflow.operators.python_operator import PythonOperator
from datahub.ingestion.run.pipeline import Pipeline
default_args = {

View File

@ -1,15 +1,7 @@
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
from airflow.exceptions import AirflowException
try:
from airflow.hooks.base import BaseHook
AIRFLOW_1 = False
except ModuleNotFoundError:
from airflow.hooks.base_hook import BaseHook
AIRFLOW_1 = True
from airflow.hooks.base import BaseHook
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
MetadataChangeEvent,
@ -22,11 +14,6 @@ if TYPE_CHECKING:
from datahub.ingestion.sink.datahub_kafka import KafkaSinkConfig
_default_hook_args = []
if AIRFLOW_1:
_default_hook_args = [None]
class DatahubRestHook(BaseHook):
"""
Creates a DataHub Rest API connection used to send metadata to DataHub.
@ -46,7 +33,7 @@ class DatahubRestHook(BaseHook):
hook_name = "DataHub REST Server"
def __init__(self, datahub_rest_conn_id: str = default_conn_name) -> None:
super().__init__(*_default_hook_args)
super().__init__()
self.datahub_rest_conn_id = datahub_rest_conn_id
@staticmethod
@ -108,7 +95,7 @@ class DatahubKafkaHook(BaseHook):
hook_name = "DataHub Kafka Sink"
def __init__(self, datahub_kafka_conn_id: str = default_conn_name) -> None:
super().__init__(*_default_hook_args)
super().__init__()
self.datahub_kafka_conn_id = datahub_kafka_conn_id
@staticmethod
@ -191,7 +178,7 @@ class DatahubGenericHook(BaseHook):
"""
def __init__(self, datahub_conn_id: str) -> None:
super().__init__(*_default_hook_args)
super().__init__()
self.datahub_conn_id = datahub_conn_id
def get_underlying_hook(self) -> Union[DatahubRestHook, DatahubKafkaHook]:

View File

@ -77,44 +77,6 @@ class DatahubLineageBackend(LineageBackend):
return
try:
# This is necessary to avoid issues with circular imports.
from airflow.lineage import prepare_lineage
from datahub_provider.hooks.datahub import AIRFLOW_1
# Detect Airflow 1.10.x inlet/outlet configurations in Airflow 2.x, and
# convert to the newer version. This code path will only be triggered
# when 2.x receives a 1.10.x inlet/outlet config.
needs_repeat_preparation = False
if (
not AIRFLOW_1
and isinstance(operator._inlets, list)
and len(operator._inlets) == 1
and isinstance(operator._inlets[0], dict)
):
from airflow.lineage import AUTO
operator._inlets = [
# See https://airflow.apache.org/docs/apache-airflow/1.10.15/lineage.html.
*operator._inlets[0].get(
"datasets", []
), # assumes these are attr-annotated
*operator._inlets[0].get("task_ids", []),
*([AUTO] if operator._inlets[0].get("auto", False) else []),
]
needs_repeat_preparation = True
if (
not AIRFLOW_1
and isinstance(operator._outlets, list)
and len(operator._outlets) == 1
and isinstance(operator._outlets[0], dict)
):
operator._outlets = [*operator._outlets[0].get("datasets", [])]
needs_repeat_preparation = True
if needs_repeat_preparation:
# Rerun the lineage preparation routine, now that the old format has been translated to the new one.
prepare_lineage(lambda self, ctx: None)(operator, context)
context = context or {} # ensure not None to satisfy mypy
send_lineage_to_datahub(
config, operator, operator.inlets, operator.outlets, context

View File

@ -1,49 +0,0 @@
# Generated by scripts/airflow1-constraints.sh on Fri Feb 11 13:16:30 EST 2022.
#
# This file helps pip resolve dependencies for Airflow 1.x in a reasonable amount
# of time during testing. Without these constraints, pip will spend hours
# backtracking in an attempt to find a compatible list of versions.
# See https://pip.pypa.io/en/latest/topics/dependency-resolution/#backtracking
# for some explanation of backtracing with the new behavior in pip 20.3+.
#
apache-airflow==1.10.15
apache-airflow-backport-providers-snowflake==2021.3.13
apache-airflow-providers-ftp==2.0.1
apache-airflow-providers-imap==2.1.0
apache-airflow-providers-sqlite==2.0.1
apispec==1.3.3
cached-property==1.5.2
chardet==3.0.4
click==7.1.2
click-default-group==1.2.2
colorlog==4.0.2
configparser==3.5.3
croniter==0.3.37
Flask-Admin==1.5.4
Flask-AppBuilder==2.3.4
Flask-Babel==1.0.0
Flask-Caching==1.3.3
flask-swagger==0.2.14
Flask-WTF==0.14.3
funcsigs==1.0.2
idna==2.10
importlib-resources==1.5.0
Jinja2==2.11.3
json-merge-patch==0.2
lazy-object-proxy==1.4.3
Markdown==2.6.11
marshmallow==2.21.0
marshmallow-enum==1.5.1
marshmallow-oneofschema==3.0.1
marshmallow-sqlalchemy==0.23.1
natsort==8.1.0
pendulum==1.4.4
requests==2.23.0
requests-mock==1.9.3
requests-toolbelt==0.9.1
SQLAlchemy-JSONField==0.9.0
tenacity==4.12.0
tzlocal==1.5.1
urllib3==1.25.11
Werkzeug==0.16.1
zope.deprecation==4.4.0

View File

@ -23,7 +23,7 @@ except ModuleNotFoundError:
import datahub.emitter.mce_builder as builder
from datahub_provider import get_provider_info
from datahub_provider.entities import Dataset
from datahub_provider.hooks.datahub import AIRFLOW_1, DatahubKafkaHook, DatahubRestHook
from datahub_provider.hooks.datahub import DatahubKafkaHook, DatahubRestHook
from datahub_provider.operators.datahub import DatahubEmitterOperator
# Approach suggested by https://stackoverflow.com/a/11887885/5004662.
@ -81,14 +81,6 @@ def test_dags_load_with_no_errors(pytestconfig):
dag_bag = DagBag(dag_folder=str(airflow_examples_folder), include_examples=False)
import_errors = dag_bag.import_errors
if AIRFLOW_1:
# The TaskFlow API is new in Airflow 2.x, so we don't expect that demo DAG
# to work on earlier versions.
import_errors = {
dag_filename: dag_errors
for dag_filename, dag_errors in import_errors.items()
if "taskflow" not in dag_filename
}
assert import_errors == {}
assert len(dag_bag.dag_ids) > 0
@ -180,22 +172,9 @@ def test_hook_airflow_ui(hook):
["inlets", "outlets"],
[
pytest.param(
# Airflow 1.10.x uses a dictionary structure for inlets and outlets.
# We want the lineage backend to support this structure for backwards
# compatability reasons, so this test is not conditional.
{"datasets": [Dataset("snowflake", "mydb.schema.tableConsumed")]},
{"datasets": [Dataset("snowflake", "mydb.schema.tableProduced")]},
id="airflow-1-10-lineage-syntax",
),
pytest.param(
# Airflow 2.x also supports a flattened list for inlets and outlets.
# We want to test this capability.
# Airflow 2.x uses a flattened list for inlets and outlets.
[Dataset("snowflake", "mydb.schema.tableConsumed")],
[Dataset("snowflake", "mydb.schema.tableProduced")],
marks=pytest.mark.skipif(
AIRFLOW_VERSION < packaging.version.parse("2.0.0"),
reason="list-style lineage is only supported in Airflow 2.x",
),
id="airflow-2-lineage-syntax",
),
],
@ -344,22 +323,8 @@ def test_lineage_backend(mock_emit, inlets, outlets):
["inlets", "outlets"],
[
pytest.param(
# Airflow 1.10.x uses a dictionary structure for inlets and outlets.
# We want the lineage backend to support this structure for backwards
# compatability reasons, so this test is not conditional.
{"datasets": [Dataset("snowflake", "mydb.schema.tableConsumed")]},
{"datasets": [Dataset("snowflake", "mydb.schema.tableProduced")]},
id="airflow-1-10-lineage-syntax",
),
pytest.param(
# Airflow 2.x also supports a flattened list for inlets and outlets.
# We want to test this capability.
[Dataset("snowflake", "mydb.schema.tableConsumed")],
[Dataset("snowflake", "mydb.schema.tableProduced")],
marks=pytest.mark.skipif(
AIRFLOW_VERSION < packaging.version.parse("2.0.0"),
reason="list-style lineage is only supported in Airflow 2.x",
),
id="airflow-2-lineage-syntax",
),
],