mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-10-10 16:25:37 +00:00
parent
0b74d3456d
commit
1edd41e28a
@ -0,0 +1,156 @@
|
||||
---
|
||||
title: Run Connectors in your own Airflow
|
||||
slug: /connectors/ingestion/run-connectors-in-airflow
|
||||
---
|
||||
|
||||
# Run Connectors in your Airflow
|
||||
|
||||
We can use Airflow in different ways:
|
||||
1. We can [extract metadata](https://docs.open-metadata.org/connectors/pipeline/airflow) from it,
|
||||
2. And we can use [connect to the OpenMetadata UI](https://docs.open-metadata.org/deployment/airflow) to deploy Workflows automatically.
|
||||
|
||||
In this guide, we will show how to host the ingestion DAGs in your Airflow directly. Note that in each connector
|
||||
page (e.g., [Snowflake](https://docs.open-metadata.org/connectors/database/snowflake/airflow)) we are showing
|
||||
an example on how to prepare a YAML configuration and run it as a DAG.
|
||||
|
||||
Here we are going to explain that a bit deeper and show an alternative process to achieve the same result.
|
||||
|
||||
## Python Operator
|
||||
|
||||
Building a DAG using the `PythonOperator` requires devs to install the `openmetadata-ingestion` package in your Airflow's
|
||||
environment. This is a comfortable approach if you have access to the Airflow host and can freely handle
|
||||
dependencies.
|
||||
|
||||
Installing the dependencies' is as easy as `pip3 install "openmetadata-ingestion[<your-connector>]"`.
|
||||
|
||||
For example, preparing a metadata ingestion DAG with this operator will look as follows:
|
||||
|
||||
```python
|
||||
import pathlib
|
||||
import yaml
|
||||
from datetime import timedelta
|
||||
from airflow import DAG
|
||||
|
||||
try:
|
||||
from airflow.operators.python import PythonOperator
|
||||
except ModuleNotFoundError:
|
||||
from airflow.operators.python_operator import PythonOperator
|
||||
|
||||
from metadata.config.common import load_config_file
|
||||
from metadata.ingestion.api.workflow import Workflow
|
||||
from airflow.utils.dates import days_ago
|
||||
|
||||
default_args = {
|
||||
"owner": "user_name",
|
||||
"email": ["username@org.com"],
|
||||
"email_on_failure": False,
|
||||
"retries": 3,
|
||||
"retry_delay": timedelta(minutes=5),
|
||||
"execution_timeout": timedelta(minutes=60)
|
||||
}
|
||||
|
||||
config = """
|
||||
<your YAML configuration>
|
||||
"""
|
||||
|
||||
def metadata_ingestion_workflow():
|
||||
workflow_config = yaml.safe_load(config)
|
||||
workflow = Workflow.create(workflow_config)
|
||||
workflow.execute()
|
||||
workflow.raise_from_status()
|
||||
workflow.print_status()
|
||||
workflow.stop()
|
||||
|
||||
with DAG(
|
||||
"sample_data",
|
||||
default_args=default_args,
|
||||
description="An example DAG which runs a OpenMetadata ingestion workflow",
|
||||
start_date=days_ago(1),
|
||||
is_paused_upon_creation=False,
|
||||
schedule_interval='*/5 * * * *',
|
||||
catchup=False,
|
||||
) as dag:
|
||||
ingest_task = PythonOperator(
|
||||
task_id="ingest_using_recipe",
|
||||
python_callable=metadata_ingestion_workflow,
|
||||
)
|
||||
```
|
||||
|
||||
Note how we are preparing the `PythonOperator` by passing the `python_callable=metadata_ingestion_workflow` as
|
||||
an argument, where `metadata_ingestion_workflow` is a function that instantiates the `Workflow` class and runs
|
||||
the whole process.
|
||||
|
||||
The drawback here? You need to install some requirements, which is not always possible. This is why on 0.12.1 and higher
|
||||
versions we introduced an alternative approach. More on that below!
|
||||
|
||||
## Docker Operator
|
||||
|
||||
From version 0.12.1 we are shipping a new image `openmetadata/ingestion-base`, which only contains the `openmetadata-ingestion`
|
||||
package and can then be used to handle ingestions in an isolated environment.
|
||||
|
||||
This is useful to prepare DAGs without any installation required on the environment, although it needs for the host
|
||||
to have access to the Docker commands.
|
||||
|
||||
For example, if you are running Airflow in Docker Compose, that can be achieved preparing a volume mapping the
|
||||
`docker.sock` file with 600 permissions.
|
||||
|
||||
```yaml
|
||||
volumes:
|
||||
- /var/run/docker.sock:/var/run/docker.sock:z # Need 666 permissions to run DockerOperator
|
||||
```
|
||||
|
||||
Then, preparing a DAG looks like this:
|
||||
|
||||
```python
|
||||
from datetime import datetime
|
||||
|
||||
from airflow import models
|
||||
from airflow.providers.docker.operators.docker import DockerOperator
|
||||
|
||||
|
||||
config = """
|
||||
<your YAML configuration>
|
||||
"""
|
||||
|
||||
with models.DAG(
|
||||
"ingestion-docker-operator",
|
||||
schedule_interval='*/5 * * * *',
|
||||
start_date=datetime(2021, 1, 1),
|
||||
catchup=False,
|
||||
tags=["OpenMetadata"],
|
||||
) as dag:
|
||||
DockerOperator(
|
||||
command="python main.py",
|
||||
image="openmetadata/ingestion-base:0.12.1",
|
||||
environment={"config": config, "pipelineType": "metadata"},
|
||||
docker_url="unix://var/run/docker.sock", # To allow to start Docker. Needs chmod 666 permissions
|
||||
tty=True,
|
||||
auto_remove="True",
|
||||
network_mode="host", # To reach the OM server
|
||||
task_id="ingest",
|
||||
dag=dag,
|
||||
)
|
||||
```
|
||||
|
||||
<Note>
|
||||
|
||||
Make sure to tune out the DAG configurations (`schedule_interval`, `start_date`, etc.) as your use case requires.
|
||||
|
||||
</Note>
|
||||
|
||||
Note that the example uses the image `openmetadata/ingestion-base:0.12.1`. Update that accordingly for higher version
|
||||
once they are released. Also, the image version should be aligned with your OpenMetadata server version to avoid
|
||||
incompatibilities.
|
||||
|
||||
Another important point here is making sure that the Airflow will be able to run Docker commands to create the task.
|
||||
As our example was done with Airflow in Docker Compose, that meant setting `docker_url="unix://var/run/docker.sock"`.
|
||||
|
||||
The final important elements here are:
|
||||
- `command="python main.py"`: This does not need to be modified, as we are shipping the `main.py` script in the
|
||||
image, used to trigger the workflow.
|
||||
- `environment={"config": config, "pipelineType": "metadata"}`: Again, in most cases you will just need to update
|
||||
the `config` string to point to the right connector.
|
||||
|
||||
Other supported values of `pipelineType` are `usage`, `lineage`, `profiler` or `TestSuite`. Pass the required flag
|
||||
depending on the type of workflow you want to execute. Make sure that the YAML config reflects what ingredients
|
||||
are required for your Workflow.
|
@ -7,16 +7,27 @@ slug: /connectors/pipeline/airflow/gcs
|
||||
|
||||
<Note>
|
||||
|
||||
This approach has been tested against Airflow 2.1.4. If you have any issues or questions,
|
||||
This approach has been tested against Airflow 2.1.4 & 2.2.5 If you have any issues or questions,
|
||||
please do not hesitate to reach out!
|
||||
|
||||
</Note>
|
||||
|
||||
There are 2 main approaches we can follow here to extract metadata from GCS. Both of them involve creating a DAG
|
||||
directly in your Composer instance, but the requirements and the steps to follow are going to be slightly different.
|
||||
|
||||
Feel free to choose whatever approach adapts best to your current architecture and constraints.
|
||||
|
||||
## Using the Python Operator
|
||||
|
||||
The most comfortable way to extract metadata out of GCS Composer is by directly creating a DAG in there
|
||||
that will handle the connection to the metadata database automatically and push the contents
|
||||
to your OpenMetadata server.
|
||||
|
||||
## Install the Requirements
|
||||
The drawback here? You need to install `openmetadata-ingestion` directly on the host. This might have some
|
||||
incompatibilities with your current Python environment and/or the internal (and changing) Composer requirements.
|
||||
In any case, once the requirements are there, preparing the DAG is super straight-forward.
|
||||
|
||||
### Install the Requirements
|
||||
|
||||
In your environment you will need to install the following packages:
|
||||
|
||||
@ -32,7 +43,7 @@ you currently have!
|
||||
|
||||
</Note>
|
||||
|
||||
## Prepare the DAG!
|
||||
### Prepare the DAG!
|
||||
|
||||
Note that this DAG is a usual connector DAG, just using the Airflow service with the `Backend` connection.
|
||||
|
||||
@ -119,6 +130,147 @@ with DAG(
|
||||
)
|
||||
```
|
||||
|
||||
## Using the Kubernetes Pod Operator
|
||||
|
||||
In this second approach we won't need to install absolutely anything to the GCS Composer environment. Instead,
|
||||
we will rely on the `KubernetesPodOperator` to use the underlying k8s cluster of Composer.
|
||||
|
||||
Then, the code won't directly run using the hosts' environment, but rather inside a container that we created
|
||||
with only the `openmetadata-ingestion` package.
|
||||
|
||||
<Note>
|
||||
|
||||
This approach only has the `openmetadata/ingestion-base` ready from version 0.12.1 or higher!
|
||||
|
||||
</Note>
|
||||
|
||||
### Requirements
|
||||
|
||||
The only thing we need to handle here is getting the URL of the underlying Composer's database. You can follow
|
||||
the official GCS [docs](https://cloud.google.com/composer/docs/composer-2/access-airflow-database) for the steps to
|
||||
obtain the credentials.
|
||||
|
||||
In a nutshell, from the Airflow UI you can to Admin > Configurations, and search for `sql_alchemy_conn`. In our case,
|
||||
the URL looked like this:
|
||||
|
||||
```
|
||||
postgresql+psycopg2://root:<pwd>@airflow-sqlproxy-service.composer-system.svc.cluster.local:3306/composer-2-0-28-airflow-2-2-5-5ab01d14
|
||||
```
|
||||
|
||||
As GCS uses Postgres for the backend database, our Airflow connection configuration will be shaped as:
|
||||
|
||||
```yaml
|
||||
connection:
|
||||
type: Postgres
|
||||
username: root
|
||||
password: ...
|
||||
hostPort: airflow-sqlproxy-service.composer-system.svc.cluster.local:3306
|
||||
database: composer-2-0-28-airflow-2-2-5-5ab01d14
|
||||
```
|
||||
|
||||
For more information on how to shape the YAML describing the Airflow metadata extraction, you can refer
|
||||
[here](https://docs.open-metadata.org/connectors/pipeline/airflow/cli#1-define-the-yaml-config).
|
||||
|
||||
|
||||
### Prepare the DAG!
|
||||
|
||||
```python
|
||||
from datetime import datetime
|
||||
|
||||
from airflow import models
|
||||
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
|
||||
|
||||
|
||||
config = """
|
||||
source:
|
||||
type: airflow
|
||||
serviceName: airflow_gcs_composer_k8s_op
|
||||
serviceConnection:
|
||||
config:
|
||||
type: Airflow
|
||||
hostPort: http://localhost:8080
|
||||
numberOfStatus: 10
|
||||
connection:
|
||||
type: Postgres
|
||||
username: root
|
||||
password: ...
|
||||
hostPort: airflow-sqlproxy-service.composer-system.svc.cluster.local:3306
|
||||
database: composer-2-0-28-airflow-2-2-5-5ab01d14
|
||||
sourceConfig:
|
||||
config:
|
||||
type: PipelineMetadata
|
||||
sink:
|
||||
type: metadata-rest
|
||||
config: {}
|
||||
workflowConfig:
|
||||
openMetadataServerConfig:
|
||||
hostPort: https://sandbox.open-metadata.org/api
|
||||
enableVersionValidation: false
|
||||
authProvider: openmetadata
|
||||
securityConfig:
|
||||
jwtToken: <JWT>
|
||||
"""
|
||||
|
||||
|
||||
with models.DAG(
|
||||
"ingestion-k8s-operator",
|
||||
schedule_interval="@once",
|
||||
start_date=datetime(2021, 1, 1),
|
||||
catchup=False,
|
||||
tags=["OpenMetadata"],
|
||||
) as dag:
|
||||
KubernetesPodOperator(
|
||||
task_id="ingest",
|
||||
name="ingest",
|
||||
cmds=["python", "main.py"],
|
||||
image="openmetadata/ingestion-base:0.12.1",
|
||||
namespace='default',
|
||||
env_vars={"config": config, "pipelineType": "metadata"},
|
||||
dag=dag,
|
||||
)
|
||||
```
|
||||
|
||||
Some remarks on this example code:
|
||||
|
||||
#### Kubernetes Pod Operator
|
||||
|
||||
You can name the task as you want (`task_id` and `name`). The important points here are the `cmds`, this should not
|
||||
be changed, and the `env_vars`. The `main.py` script that gets shipped within the image will load the env vars
|
||||
as they are shown, so only modify the content of the config YAML, but not this dictionary.
|
||||
|
||||
Note that the example uses the image `openmetadata/ingestion-base:0.12.1`. Update that accordingly for higher version
|
||||
once they are released. Also, the image version should be aligned with your OpenMetadata server version to avoid
|
||||
incompatibilities.
|
||||
|
||||
```python
|
||||
KubernetesPodOperator(
|
||||
task_id="ingest",
|
||||
name="ingest",
|
||||
cmds=["python", "main.py"],
|
||||
image="openmetadata/ingestion-base:0.12.1",
|
||||
namespace='default',
|
||||
env_vars={"config": config, "pipelineType": "metadata"},
|
||||
dag=dag,
|
||||
)
|
||||
```
|
||||
|
||||
You can find more information about the `KubernetesPodOperator` and how to tune its configurations
|
||||
[here](https://cloud.google.com/composer/docs/how-to/using/using-kubernetes-pod-operator).
|
||||
|
||||
# OpenMetadata Server Config
|
||||
|
||||
The easiest approach here is to generate a bot with a **JWT** token directly from the OpenMetadata UI. You can then use
|
||||
the following workflow config:
|
||||
|
||||
```yaml
|
||||
workflowConfig:
|
||||
openMetadataServerConfig:
|
||||
hostPort: http://localhost:8585/api
|
||||
authProvider: openmetadata
|
||||
securityConfig:
|
||||
jwtToken: <JWT>
|
||||
```
|
||||
|
||||
## Google SSO
|
||||
|
||||
Against Google SSO we need to use the [Cloud Storage](https://cloud.google.com/composer/docs/concepts/cloud-storage)
|
||||
@ -127,3 +279,14 @@ against `/home/airflow/gcs/data/` in Airflow.
|
||||
|
||||
You can see in the example above how our file is named `gcs_creds_beta.json`, which gets resolved in Airflow as
|
||||
`/home/airflow/gcs/data/gcs_creds_beta.json`.
|
||||
|
||||
The workflow config here would look like:
|
||||
|
||||
```yaml
|
||||
workflowConfig:
|
||||
openMetadataServerConfig:
|
||||
hostPort: https://sandbox.getcollate.io/api
|
||||
authProvider: google
|
||||
securityConfig:
|
||||
secretKey: /home/airflow/gcs/data/gcs_creds_beta.json
|
||||
```
|
||||
|
@ -480,6 +480,8 @@ site_menu:
|
||||
url: /connectors/ingestion/versioning/event-notification-via-webhooks
|
||||
- category: Connectors / Ingestion / Ingestion Pipeline UI Deployment
|
||||
url: /connectors/ingestion/deployment
|
||||
- category: Connectors / Ingestion / Run Connectors in your Airflow
|
||||
url: /connectors/ingestion/run-connectors-in-airflow
|
||||
|
||||
|
||||
- category: How to guides
|
||||
|
Loading…
x
Reference in New Issue
Block a user