docs(ingest): add airflow docs that use the PythonVirtualenvOperator (#6604)

This commit is contained in:
Harshal Sheth 2022-12-02 13:56:17 -05:00 committed by GitHub
parent 71466aab36
commit a1e62c723e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 150 additions and 33 deletions

View File

@ -2,11 +2,41 @@
If you are using Apache Airflow for your scheduling then you might want to also use it for scheduling your ingestion recipes. For any Airflow specific questions you can go through [Airflow docs](https://airflow.apache.org/docs/apache-airflow/stable/) for more details.
To schedule your recipe through Airflow you can follow these steps
- Create a recipe file e.g. `recipe.yml`
- Ensure the receipe file is in a folder accessible to your airflow workers. You can either specify absolute path on the machines where Airflow is installed or a path relative to `AIRFLOW_HOME`.
- Ensure [DataHub CLI](../../docs/cli.md) is installed in your airflow environment
- Create a sample DAG file like [`generic_recipe_sample_dag.py`](../src/datahub_provider/example_dags/generic_recipe_sample_dag.py). This will read your DataHub ingestion recipe file and run it.
We've provided a few examples of how to configure your DAG:
- [`mysql_sample_dag`](../src/datahub_provider/example_dags/mysql_sample_dag.py) embeds the full MySQL ingestion configuration inside the DAG.
- [`snowflake_sample_dag`](../src/datahub_provider/example_dags/snowflake_sample_dag.py) avoids embedding credentials inside the recipe, and instead fetches them from Airflow's [Connections](https://airflow.apache.org/docs/apache-airflow/stable/howto/connection/index.html) feature. You must configure your connections in Airflow to use this approach.
:::tip
These example DAGs use the `PythonVirtualenvOperator` to run the ingestion. This is the recommended approach, since it guarantees that there will not be any conflicts between DataHub and the rest of your Airflow environment.
When configuring the task, it's important to specify the requirements with your source and set the `system_site_packages` option to false.
```py
ingestion_task = PythonVirtualenvOperator(
task_id="ingestion_task",
requirements=[
"acryl-datahub[<your-source>]",
],
system_site_packages=False,
python_callable=your_callable,
)
```
:::
<details>
<summary>Advanced: loading a recipe file</summary>
In more advanced cases, you might want to store your ingestion recipe in a file and load it from your task.
- Ensure the recipe file is in a folder accessible to your airflow workers. You can either specify absolute path on the machines where Airflow is installed or a path relative to `AIRFLOW_HOME`.
- Ensure [DataHub CLI](../../docs/cli.md) is installed in your airflow environment.
- Create a DAG task to read your DataHub ingestion recipe file and run it. See the example below for reference.
- Deploy the DAG file into airflow for scheduling. Typically this involves checking in the DAG file into your dags folder which is accessible to your Airflow instance.
Alternatively you can have an inline recipe as given in [`mysql_sample_dag.py`](../src/datahub_provider/example_dags/mysql_sample_dag.py). This runs a MySQL metadata ingestion pipeline using an inlined configuration.
Example: [`generic_recipe_sample_dag`](../src/datahub_provider/example_dags/generic_recipe_sample_dag.py)
</details>

View File

@ -461,8 +461,9 @@ base_dev_requirements = {
dev_requirements = {
*base_dev_requirements,
# Extra requirements for Airflow.
"apache-airflow[snowflake]>=2.0.2", # snowflake is used in example dags
"snowflake-sqlalchemy<=1.2.4", # make constraint consistent with extras
"virtualenv", # needed by PythonVirtualenvOperator
}
full_test_dev_requirements = {

View File

@ -0,0 +1,2 @@
# This file uses a connection hook, which fails to load unless configured.
snowflake_sample_dag.py

View File

@ -26,6 +26,7 @@ default_args = {
def datahub_recipe():
# Note that this will also resolve environment variables in the recipe.
config = load_config_file("path/to/recipe.yml")
pipeline = Pipeline.create(config)

View File

@ -5,33 +5,22 @@ from within an Airflow DAG. Note that the DB connection configuration is
embedded within the code.
"""
from datetime import timedelta
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datahub.ingestion.run.pipeline import Pipeline
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email": ["jdoe@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
"execution_timeout": timedelta(minutes=120),
}
from airflow.operators.python import PythonVirtualenvOperator
def ingest_from_mysql():
from datahub.ingestion.run.pipeline import Pipeline
pipeline = Pipeline.create(
# This configuration is analogous to a recipe configuration.
{
"source": {
"type": "mysql",
"config": {
# If you want to use Airflow connections, take a look at the snowflake_sample_dag.py example.
"username": "user",
"password": "pass",
"database": "db_name",
@ -45,18 +34,28 @@ def ingest_from_mysql():
}
)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status()
with DAG(
"datahub_mysql_ingest",
default_args=default_args,
default_args={
"owner": "airflow",
},
description="An example DAG which ingests metadata from MySQL to DataHub",
start_date=datetime(2022, 1, 1),
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
catchup=False,
) as dag:
ingest_task = PythonOperator(
# While it is also possible to use the PythonOperator, we recommend using
# the PythonVirtualenvOperator to ensure that there are no dependency
# conflicts between DataHub and the rest of your Airflow environment.
ingest_task = PythonVirtualenvOperator(
task_id="ingest_from_mysql",
requirements=[
"acryl-datahub[mysql]",
],
system_site_packages=False,
python_callable=ingest_from_mysql,
)

View File

@ -0,0 +1,87 @@
"""Snowflake DataHub Ingest DAG
This example demonstrates how to ingest metadata from Snowflake into DataHub
from within an Airflow DAG. In contrast to the MySQL example, this DAG
pulls the DB connection configuration from Airflow's connection store.
"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.hooks.base import BaseHook
from airflow.operators.python import PythonVirtualenvOperator
def ingest_from_snowflake(snowflake_credentials, datahub_gms_server):
from datahub.ingestion.run.pipeline import Pipeline
pipeline = Pipeline.create(
# This configuration is analogous to a recipe configuration.
{
"source": {
"type": "snowflake",
"config": {
**snowflake_credentials,
# Other Snowflake config can be added here.
"profiling": {"enabled": False},
},
},
# Other ingestion features, like transformers, are also supported.
# "transformers": [
# {
# "type": "simple_add_dataset_ownership",
# "config": {
# "owner_urns": [
# "urn:li:corpuser:example",
# ]
# },
# }
# ],
"sink": {
"type": "datahub-rest",
"config": {"server": datahub_gms_server},
},
}
)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status()
with DAG(
"datahub_snowflake_ingest",
default_args={
"owner": "airflow",
},
description="An example DAG which ingests metadata from Snowflake to DataHub",
start_date=datetime(2022, 1, 1),
schedule_interval=timedelta(days=1),
catchup=False,
) as dag:
# This example pulls credentials from Airflow's connection store.
# For this to work, you must have previously configured these connections in Airflow.
# See the Airflow docs for details: https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html
snowflake_conn = BaseHook.get_connection("snowflake_admin_default")
datahub_conn = BaseHook.get_connection("datahub_rest_default")
# While it is also possible to use the PythonOperator, we recommend using
# the PythonVirtualenvOperator to ensure that there are no dependency
# conflicts between DataHub and the rest of your Airflow environment.
ingest_task = PythonVirtualenvOperator(
task_id="ingest_from_snowflake",
requirements=[
"acryl-datahub[snowflake]",
],
system_site_packages=False,
python_callable=ingest_from_snowflake,
op_kwargs={
"snowflake_credentials": {
"username": snowflake_conn.login,
"password": snowflake_conn.password,
"account_id": snowflake_conn.extra_dejson["account"],
"warehouse": snowflake_conn.extra_dejson.get("warehouse"),
"role": snowflake_conn.extra_dejson.get("role"),
},
"datahub_gms_server": datahub_conn.host,
},
)

View File

@ -75,21 +75,18 @@ def test_airflow_provider_info():
assert get_provider_info()
@pytest.mark.skipif(
AIRFLOW_VERSION < packaging.version.parse("2.0.0"),
reason="the examples use list-style lineage, which is only supported on Airflow 2.x",
)
def test_dags_load_with_no_errors(pytestconfig):
def test_dags_load_with_no_errors(pytestconfig: pytest.Config) -> None:
airflow_examples_folder = (
pytestconfig.rootpath / "src/datahub_provider/example_dags"
)
# Note: the .airflowignore file skips the snowflake DAG.
dag_bag = DagBag(dag_folder=str(airflow_examples_folder), include_examples=False)
import_errors = dag_bag.import_errors
assert import_errors == {}
assert len(dag_bag.dag_ids) > 0
assert len(import_errors) == 0
assert dag_bag.size() > 0
@contextmanager