If you're looking to schedule DataHub ingestion using Airflow, see the guide on [scheduling ingestion with Airflow](../../metadata-ingestion/schedule_docs/airflow.md).
- Automatic column-level lineage extraction from various operators e.g. SQL operators (including `MySqlOperator`, `PostgresOperator`, `SnowflakeOperator`, `BigQueryInsertJobOperator`, and more), `S3FileTransformOperator`, and more.
The plugin requires Airflow 2.7+ and Python 3.9+. If you're using Airflow older than 2.7, it's possible to use the plugin with older versions of `acryl-datahub-airflow-plugin`. See the [compatibility section](#compatibility) for more details.
The plugin requires Airflow 2.7+ and Python 3.9+. If you don't meet these requirements, see the [compatibility section](#compatibility) for other options.
On the Airflow UI, go to Admin -> Connections and click the "+" symbol to create a new connection. Select "DataHub REST Server" from the dropdown for "Connection Type" and enter the appropriate values.
No additional configuration is required to use the plugin. However, there are some optional configuration parameters that can be set in the `airflow.cfg` file.
To automatically extract lineage information, the plugin builds on top of Airflow's built-in [OpenLineage extractors](https://openlineage.io/docs/integrations/airflow/default-extractors).
The SQL-related extractors have been updated to use [DataHub's SQL lineage parser](./sql_parsing.md), which is more robust than the built-in one and uses DataHub's metadata information to generate column-level lineage.
-`SQLExecuteQueryOperator`, including any subclasses. Note that in newer versions of Airflow (generally Airflow 2.5+), most SQL operators inherit from this class.
-`AthenaOperator` and `AWSAthenaOperator`
-`BigQueryOperator` and `BigQueryExecuteQueryOperator`
You can manually annotate lineage by setting `inlets` and `outlets` on your Airflow operators. This is useful if you're using an operator that doesn't support automatic lineage extraction, or if you want to override the automatic lineage extraction.
We have a few code samples that demonstrate how to use `inlets` and `outlets`:
- [`lineage_backend_taskflow_demo.py`](../../metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/example_dags/lineage_backend_taskflow_demo.py) - uses the [TaskFlow API](https://airflow.apache.org/docs/apache-airflow/stable/concepts/taskflow.html)
For more information, take a look at the [Airflow lineage docs](https://airflow.apache.org/docs/apache-airflow/stable/lineage.html).
### Custom Operators
If you have created a [custom Airflow operator](https://airflow.apache.org/docs/apache-airflow/stable/howto/custom-operator.html) that inherits from the BaseOperator class,
You can only set table-level lineage using inlets and outlets. For column-level lineage, you need to write a custom extractor for your custom operator.
If you override the `pre_execute` and `post_execute` function, ensure they include the `@prepare_lineage` and `@apply_lineage` decorators respectively. Reference the [Airflow docs](https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/lineage.html#lineage) for more details.
See example implementation of a custom operator using SQL parser to capture table level lineage [here](../../metadata-ingestion-modules/airflow-plugin/tests/integration/dags/custom_operator_sql_parsing.py)
You can also create a custom extractor to extract lineage from any operator. This is useful if you're using a built-in Airflow operator for which we don't support automatic lineage extraction.
See this [example PR](https://github.com/datahub-project/datahub/pull/10452) which adds a custom extractor for the `BigQueryInsertJobOperator` operator.
## Cleanup obsolete pipelines and tasks from Datahub
There might be a case where the DAGs are removed from the Airflow but the corresponding pipelines and tasks are still there in the Datahub, let's call such pipelines ans tasks, `obsolete pipelines and tasks`
Following are the steps to cleanup them from the datahub:
Reference [`lineage_emission_dag.py`](../../metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/example_dags/lineage_emission_dag.py) for a full example.
In order to use this example, you must first configure the Datahub hook. Like in ingestion, we support a Datahub REST hook and a Kafka-based hook. See the plugin configuration for examples.
- It should also print a log line like `INFO [datahub_airflow_plugin.datahub_listener] DataHub plugin using DataHubRestEmitter: configured to talk to <datahub_url>` during Airflow startup, and the `airflow plugins` command should list `datahub_plugin` with a listener enabled.
- If using the plugin's automatic lineage, ensure that the `enable_extractors` config is set to true and that automatic lineage is supported for your operator.
- If using manual lineage annotation, ensure that you're using the `datahub_airflow_plugin.entities.Dataset` or `datahub_airflow_plugin.entities.Urn` classes for your inlets and outlets.
If your URLs aren't being generated correctly (usually they'll start with `http://localhost:8080` instead of the correct hostname), you may need to set the webserver `base_url` config.
The solution is to upgrade `acryl-datahub-airflow-plugin>=0.12.0.4` or upgrade `pluggy>=1.2.0`. See this [PR](https://github.com/datahub-project/datahub/pull/9365) for details.
For extremely large Airflow deployments with thousands of tasks, you may see issues where the plugin interferes with the performance of the Airflow scheduler. In those cases, you can set the `DATAHUB_AIRFLOW_PLUGIN_RUN_IN_THREAD_TIMEOUT=0` environment variable. This makes the DataHub plugin run fully in background threads, but can cause us to miss some metadata if the scheduler shuts down soon after processing a task.
Set the `datahub.enabled` configuration property to `False` in the `airflow.cfg` file and restart the Airflow environment to reload the configuration and disable the plugin.
```ini title="airflow.cfg"
[datahub]
enabled = False
```
#### 2. Disable via Airflow Variable (Kill-Switch)
If a restart is not possible and you need a faster way to disable the plugin, you can use the kill-switch. Create and set the `datahub_airflow_plugin_disable_listener` Airflow variable to `true`. This ensures that the listener won't process anything.
#### Command Line
```shell
airflow variables set datahub_airflow_plugin_disable_listener true
```
#### Airflow UI
1. Go to Admin -> Variables.
2. Click the "+" symbol to create a new variable.
3. Set the key to `datahub_airflow_plugin_disable_listener` and the value to `true`.
This will immediately disable the plugin without requiring a restart.
We try to support Airflow releases for ~2 years after their release. This is a best-effort guarantee - it's not always possible due to dependency / security issues cropping up in older versions.
We previously had two implementations of the plugin - v1 and v2. The v2 plugin is now the default, and the v1 plugin has since been removed. The v1 plugin had many limitations, chiefly that it does not support automatic lineage extraction. Docs for the v1 plugin can be accessed in our [docs archive](https://docs-website-r5eolot5n-acryldata.vercel.app/docs/lineage/airflow#datahub-plugin-v1).
DataHub also previously supported an Airflow [lineage backend](https://airflow.apache.org/docs/apache-airflow/2.2.0/lineage.html#lineage-backend) implementation. The lineage backend functionality was pretty limited - it did not support automatic lineage extraction, did not capture task failures, and did not work in AWS MWAA - and so it has been removed from the codebase. The [documentation for the lineage backend](https://docs-website-1wmaehubl-acryldata.vercel.app/docs/lineage/airflow/#using-datahubs-airflow-lineage-backend-deprecated) has been archived.