From 1edd41e28a0819df452e36b5b9273ad43ac2a51c Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Thu, 13 Oct 2022 23:00:15 +0200 Subject: [PATCH] Fix #7898 - DockerOperator and GCS K8s operator docs (#8137) --- .../ingestion/deployment/airflow-dag.md | 156 ++++++++++++++++ .../connectors/pipeline/airflow/gcs.md | 169 +++++++++++++++++- openmetadata-docs/content/menu.md | 2 + 3 files changed, 324 insertions(+), 3 deletions(-) create mode 100644 openmetadata-docs/content/connectors/ingestion/deployment/airflow-dag.md diff --git a/openmetadata-docs/content/connectors/ingestion/deployment/airflow-dag.md b/openmetadata-docs/content/connectors/ingestion/deployment/airflow-dag.md new file mode 100644 index 00000000000..4dc2345d5ec --- /dev/null +++ b/openmetadata-docs/content/connectors/ingestion/deployment/airflow-dag.md @@ -0,0 +1,156 @@ +--- +title: Run Connectors in your own Airflow +slug: /connectors/ingestion/run-connectors-in-airflow +--- + +# Run Connectors in your Airflow + +We can use Airflow in different ways: +1. We can [extract metadata](https://docs.open-metadata.org/connectors/pipeline/airflow) from it, +2. And we can use [connect to the OpenMetadata UI](https://docs.open-metadata.org/deployment/airflow) to deploy Workflows automatically. + +In this guide, we will show how to host the ingestion DAGs in your Airflow directly. Note that in each connector +page (e.g., [Snowflake](https://docs.open-metadata.org/connectors/database/snowflake/airflow)) we are showing +an example on how to prepare a YAML configuration and run it as a DAG. + +Here we are going to explain that a bit deeper and show an alternative process to achieve the same result. + +## Python Operator + +Building a DAG using the `PythonOperator` requires devs to install the `openmetadata-ingestion` package in your Airflow's +environment. This is a comfortable approach if you have access to the Airflow host and can freely handle +dependencies. + +Installing the dependencies' is as easy as `pip3 install "openmetadata-ingestion[]"`. + +For example, preparing a metadata ingestion DAG with this operator will look as follows: + +```python +import pathlib +import yaml +from datetime import timedelta +from airflow import DAG + +try: + from airflow.operators.python import PythonOperator +except ModuleNotFoundError: + from airflow.operators.python_operator import PythonOperator + +from metadata.config.common import load_config_file +from metadata.ingestion.api.workflow import Workflow +from airflow.utils.dates import days_ago + +default_args = { + "owner": "user_name", + "email": ["username@org.com"], + "email_on_failure": False, + "retries": 3, + "retry_delay": timedelta(minutes=5), + "execution_timeout": timedelta(minutes=60) +} + +config = """ + +""" + +def metadata_ingestion_workflow(): + workflow_config = yaml.safe_load(config) + workflow = Workflow.create(workflow_config) + workflow.execute() + workflow.raise_from_status() + workflow.print_status() + workflow.stop() + +with DAG( + "sample_data", + default_args=default_args, + description="An example DAG which runs a OpenMetadata ingestion workflow", + start_date=days_ago(1), + is_paused_upon_creation=False, + schedule_interval='*/5 * * * *', + catchup=False, +) as dag: + ingest_task = PythonOperator( + task_id="ingest_using_recipe", + python_callable=metadata_ingestion_workflow, + ) +``` + +Note how we are preparing the `PythonOperator` by passing the `python_callable=metadata_ingestion_workflow` as +an argument, where `metadata_ingestion_workflow` is a function that instantiates the `Workflow` class and runs +the whole process. + +The drawback here? You need to install some requirements, which is not always possible. This is why on 0.12.1 and higher +versions we introduced an alternative approach. More on that below! + +## Docker Operator + +From version 0.12.1 we are shipping a new image `openmetadata/ingestion-base`, which only contains the `openmetadata-ingestion` +package and can then be used to handle ingestions in an isolated environment. + +This is useful to prepare DAGs without any installation required on the environment, although it needs for the host +to have access to the Docker commands. + +For example, if you are running Airflow in Docker Compose, that can be achieved preparing a volume mapping the +`docker.sock` file with 600 permissions. + +```yaml +volumes: + - /var/run/docker.sock:/var/run/docker.sock:z # Need 666 permissions to run DockerOperator +``` + +Then, preparing a DAG looks like this: + +```python +from datetime import datetime + +from airflow import models +from airflow.providers.docker.operators.docker import DockerOperator + + +config = """ + +""" + +with models.DAG( + "ingestion-docker-operator", + schedule_interval='*/5 * * * *', + start_date=datetime(2021, 1, 1), + catchup=False, + tags=["OpenMetadata"], +) as dag: + DockerOperator( + command="python main.py", + image="openmetadata/ingestion-base:0.12.1", + environment={"config": config, "pipelineType": "metadata"}, + docker_url="unix://var/run/docker.sock", # To allow to start Docker. Needs chmod 666 permissions + tty=True, + auto_remove="True", + network_mode="host", # To reach the OM server + task_id="ingest", + dag=dag, + ) +``` + + + +Make sure to tune out the DAG configurations (`schedule_interval`, `start_date`, etc.) as your use case requires. + + + +Note that the example uses the image `openmetadata/ingestion-base:0.12.1`. Update that accordingly for higher version +once they are released. Also, the image version should be aligned with your OpenMetadata server version to avoid +incompatibilities. + +Another important point here is making sure that the Airflow will be able to run Docker commands to create the task. +As our example was done with Airflow in Docker Compose, that meant setting `docker_url="unix://var/run/docker.sock"`. + +The final important elements here are: +- `command="python main.py"`: This does not need to be modified, as we are shipping the `main.py` script in the + image, used to trigger the workflow. +- `environment={"config": config, "pipelineType": "metadata"}`: Again, in most cases you will just need to update + the `config` string to point to the right connector. + +Other supported values of `pipelineType` are `usage`, `lineage`, `profiler` or `TestSuite`. Pass the required flag +depending on the type of workflow you want to execute. Make sure that the YAML config reflects what ingredients +are required for your Workflow. diff --git a/openmetadata-docs/content/connectors/pipeline/airflow/gcs.md b/openmetadata-docs/content/connectors/pipeline/airflow/gcs.md index f92e52037f3..8b8eb5a265e 100644 --- a/openmetadata-docs/content/connectors/pipeline/airflow/gcs.md +++ b/openmetadata-docs/content/connectors/pipeline/airflow/gcs.md @@ -7,16 +7,27 @@ slug: /connectors/pipeline/airflow/gcs -This approach has been tested against Airflow 2.1.4. If you have any issues or questions, +This approach has been tested against Airflow 2.1.4 & 2.2.5 If you have any issues or questions, please do not hesitate to reach out! +There are 2 main approaches we can follow here to extract metadata from GCS. Both of them involve creating a DAG +directly in your Composer instance, but the requirements and the steps to follow are going to be slightly different. + +Feel free to choose whatever approach adapts best to your current architecture and constraints. + +## Using the Python Operator + The most comfortable way to extract metadata out of GCS Composer is by directly creating a DAG in there that will handle the connection to the metadata database automatically and push the contents to your OpenMetadata server. -## Install the Requirements +The drawback here? You need to install `openmetadata-ingestion` directly on the host. This might have some +incompatibilities with your current Python environment and/or the internal (and changing) Composer requirements. +In any case, once the requirements are there, preparing the DAG is super straight-forward. + +### Install the Requirements In your environment you will need to install the following packages: @@ -32,7 +43,7 @@ you currently have! -## Prepare the DAG! +### Prepare the DAG! Note that this DAG is a usual connector DAG, just using the Airflow service with the `Backend` connection. @@ -119,6 +130,147 @@ with DAG( ) ``` +## Using the Kubernetes Pod Operator + +In this second approach we won't need to install absolutely anything to the GCS Composer environment. Instead, +we will rely on the `KubernetesPodOperator` to use the underlying k8s cluster of Composer. + +Then, the code won't directly run using the hosts' environment, but rather inside a container that we created +with only the `openmetadata-ingestion` package. + + + +This approach only has the `openmetadata/ingestion-base` ready from version 0.12.1 or higher! + + + +### Requirements + +The only thing we need to handle here is getting the URL of the underlying Composer's database. You can follow +the official GCS [docs](https://cloud.google.com/composer/docs/composer-2/access-airflow-database) for the steps to +obtain the credentials. + +In a nutshell, from the Airflow UI you can to Admin > Configurations, and search for `sql_alchemy_conn`. In our case, +the URL looked like this: + +``` +postgresql+psycopg2://root:@airflow-sqlproxy-service.composer-system.svc.cluster.local:3306/composer-2-0-28-airflow-2-2-5-5ab01d14 +``` + +As GCS uses Postgres for the backend database, our Airflow connection configuration will be shaped as: + +```yaml +connection: + type: Postgres + username: root + password: ... + hostPort: airflow-sqlproxy-service.composer-system.svc.cluster.local:3306 + database: composer-2-0-28-airflow-2-2-5-5ab01d14 +``` + +For more information on how to shape the YAML describing the Airflow metadata extraction, you can refer +[here](https://docs.open-metadata.org/connectors/pipeline/airflow/cli#1-define-the-yaml-config). + + +### Prepare the DAG! + +```python +from datetime import datetime + +from airflow import models +from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator + + +config = """ +source: + type: airflow + serviceName: airflow_gcs_composer_k8s_op + serviceConnection: + config: + type: Airflow + hostPort: http://localhost:8080 + numberOfStatus: 10 + connection: + type: Postgres + username: root + password: ... + hostPort: airflow-sqlproxy-service.composer-system.svc.cluster.local:3306 + database: composer-2-0-28-airflow-2-2-5-5ab01d14 + sourceConfig: + config: + type: PipelineMetadata +sink: + type: metadata-rest + config: {} +workflowConfig: + openMetadataServerConfig: + hostPort: https://sandbox.open-metadata.org/api + enableVersionValidation: false + authProvider: openmetadata + securityConfig: + jwtToken: +""" + + +with models.DAG( + "ingestion-k8s-operator", + schedule_interval="@once", + start_date=datetime(2021, 1, 1), + catchup=False, + tags=["OpenMetadata"], +) as dag: + KubernetesPodOperator( + task_id="ingest", + name="ingest", + cmds=["python", "main.py"], + image="openmetadata/ingestion-base:0.12.1", + namespace='default', + env_vars={"config": config, "pipelineType": "metadata"}, + dag=dag, + ) +``` + +Some remarks on this example code: + +#### Kubernetes Pod Operator + +You can name the task as you want (`task_id` and `name`). The important points here are the `cmds`, this should not +be changed, and the `env_vars`. The `main.py` script that gets shipped within the image will load the env vars +as they are shown, so only modify the content of the config YAML, but not this dictionary. + +Note that the example uses the image `openmetadata/ingestion-base:0.12.1`. Update that accordingly for higher version +once they are released. Also, the image version should be aligned with your OpenMetadata server version to avoid +incompatibilities. + +```python +KubernetesPodOperator( + task_id="ingest", + name="ingest", + cmds=["python", "main.py"], + image="openmetadata/ingestion-base:0.12.1", + namespace='default', + env_vars={"config": config, "pipelineType": "metadata"}, + dag=dag, +) +``` + +You can find more information about the `KubernetesPodOperator` and how to tune its configurations +[here](https://cloud.google.com/composer/docs/how-to/using/using-kubernetes-pod-operator). + +# OpenMetadata Server Config + +The easiest approach here is to generate a bot with a **JWT** token directly from the OpenMetadata UI. You can then use +the following workflow config: + +```yaml +workflowConfig: + openMetadataServerConfig: + hostPort: http://localhost:8585/api + authProvider: openmetadata + securityConfig: + jwtToken: +``` + ## Google SSO Against Google SSO we need to use the [Cloud Storage](https://cloud.google.com/composer/docs/concepts/cloud-storage) @@ -127,3 +279,14 @@ against `/home/airflow/gcs/data/` in Airflow. You can see in the example above how our file is named `gcs_creds_beta.json`, which gets resolved in Airflow as `/home/airflow/gcs/data/gcs_creds_beta.json`. + +The workflow config here would look like: + +```yaml +workflowConfig: + openMetadataServerConfig: + hostPort: https://sandbox.getcollate.io/api + authProvider: google + securityConfig: + secretKey: /home/airflow/gcs/data/gcs_creds_beta.json +``` diff --git a/openmetadata-docs/content/menu.md b/openmetadata-docs/content/menu.md index db7dcdd3630..20147c323b5 100644 --- a/openmetadata-docs/content/menu.md +++ b/openmetadata-docs/content/menu.md @@ -480,6 +480,8 @@ site_menu: url: /connectors/ingestion/versioning/event-notification-via-webhooks - category: Connectors / Ingestion / Ingestion Pipeline UI Deployment url: /connectors/ingestion/deployment + - category: Connectors / Ingestion / Run Connectors in your Airflow + url: /connectors/ingestion/run-connectors-in-airflow - category: How to guides