In this page we are going to explain how OpenMetadata internally deploys the workflows that are configured from the UI.
As of now, OpenMetadata uses Airflow under the hood as a scheduler for the Ingestion Pipelines.
This is the right place if you are curious about our current approach or if you are looking forward to contribute by
adding the implementation to deploy workflows to another tool directly from the UI.
{% note %}
Here we are talking about an internal implementation detail. Do not be confused about the information that is going to
be shared here vs. the pipeline services supported as connectors for metadata extraction.
For example, we use Airflow as an internal element to deploy and schedule ingestion workflows, but we can also extract
metadata from Airflow. Fivetran, for example, is a possible source, but we are not using it to deploy and schedule workflows.
{% /note %}
## Before Reading
This is a rather deep dive guide. We recommend that you get familiar with the overall OpenMetadata architecture first. You
can find that [here](/main-concepts/high-level-design).
## System Context
Everything in OpenMetadata is centralized and managed via the API. Then, the Workflow's lifecycle is also fully managed
via the OpenMetadata server APIs. More over, the `IngestionPipeline` Entity is also defined in a JSON Schema that you
can find [here](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/entity/services/ingestionPipelines/ingestionPipeline.json).
Note how OpenMetadata here acts as a middleware, connecting the actions being triggered in the UI to external orchestration
systems, which will be the ones managing the heavy lifting of getting a workflow created, scheduled and run. Out of the box,
OpenMetadata ships with the required logic to manage this connection to Airflow. Any workflow triggered from the UI won't
directly run on the server, but instead it will be handled as a DAG in Airflow.
The whole process here will describe in further detail how we transform an incoming request from the UI - in the shape
of an Ingestion Pipeline Entity - to a DAG that Airflow can fully manage.
## OpenMetadata Server Container Diagram
The main difference between an `IngestionPipeline` and any other Entity in OpenMetadata is that we need to bind some
logic to an external system. Therefore, we are not done by the time we create the Entity itself, but we need an extra
component handling the communication between Server and Orchestrator.
We then have the `IngestionPipelineResource` defining not only the endpoints for managing the Entity, but also routes
to handle the workflow lifecycle, such as deploying it, triggering it or deleting it from the orchestrator.
- The Entity management is still handled by the `EntityRepository` and saved in the Storage Layer as a JSON,
- While the communication to the Orchestrator is handled with the `PipelineServiceClient`.
While the endpoints are directly defined in the `IngestionPipelineResource`, the `PipelineServiceClient` is an interface
that decouples how OpenMetadata communicates with the Orchestrator, as different external systems will need different
calls and data to be sent.
- You can find the `PipelineServiceClient` abstraction [here](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/java/org/openmetadata/sdk/PipelineServiceClient.java),
- And the `AirflowRESTClient` implementation [here](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.java).
The clients that implement the abstractions from the `PipelineServiceClient` are merely a translation layer between the
information received in the shape of an `IngestionPipeline` Entity, and the specific requirements of each Orchestrator.
After creating a new workflow from the UI or when editing it, there are two calls happening:
-`POST` or `PUT` call to update the `Ingestion Pipeline Entity`,
-`/deploy` HTTP call to the `IngestionPipelineResource` to trigger the deployment of the new or updated DAG in the Orchestrator.
Based on its [JSON Schema](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/entity/services/ingestionPipelines/ingestionPipeline.json),
there are a few properties about the Ingestion Pipeline we can highlight:
**1.** `service`: a pipeline is linked via an Entity Reference to a Service Entity or a Test Suite Entity. From the service is
**2.** `pipelineType`: which represents the type of workflow to be created. This flag will be used down the line in the Orchestrator
logic to properly create the required Python classes (e.g., `Workflow`, `ProfilerWorkflow`, `TestSuiteWorkflow`, etc.).
**3.** `sourceConfig`: which is dependent on the pipeline type and define how the pipeline should behave (e.g., marking ingesting views as `False`).
**4.** `openMetadataServerConnection`: defining how to reach the OM server with properties such as host, auth configuration, etc.
**5.** `airflowConfig`: with Airflow specific configurations about the DAG such as the schedule.
{% note %}
While we have yet to update the `airflowConfig` property to be more generic, the only field actually being used is the
schedule. You might see this property here, but the whole process can still support other Orchestrators. We will clean
Then, the actual deployment logic is handled by the class implementing the Pipeline Service Client. For this example,
it will be the [AirflowRESTClient](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.java).
First, let's see what it is needed to instantiate the Airflow REST Client:
```java
public AirflowRESTClient(AirflowConfiguration airflowConfig) {
super(
airflowConfig.getUsername(),
airflowConfig.getPassword(),
airflowConfig.getApiEndpoint(),
airflowConfig.getHostIp(),
airflowConfig.getTimeout());
}
```
If we focus on the important properties here, we can see:
- username,
- password,
- and an API Endpoint
As we use a Basic Auth to connect to Airflow's API, those all the necessary ingredients. This is just Airflow specific,
so any other Pipeline Service can tune this parameters as needed.
If we then check what has been implemented in the `deployPipeline` method of the Airflow REST Client, we will see
that it is just calling a `/deploy` endpoint from its API root. What is this endpoint? What does it do?
### Airflow Managed APIs
Airflow has many benefits, but it does not support to create DAGs dynamically via its API. That is why we have
created the [OpenMetadata Airflow Managed APIs](https://github.com/open-metadata/OpenMetadata/tree/main/openmetadata-airflow-apis)
Python package. This is a plugin that can be installed in Airflow that adds a set of endpoints (all that is needed for
the Pipeline Service implementation), and more specifically, helps us create the bridge between the `IngestionPipeline` Entity
and whatever Airflow requires to create a DAG.
We know that to create a new DAG in Airflow we need a Python file to be placed under the `AIRFLOW_HOME/dags` directory (by default).
Then, calling the `/deploy` endpoint will make the necessary steps to create such a file.
What it is important here is to notice that in order to run a metadata ingestion workflow,
we just need the following few lines of Python code:
```python
from metadata.workflow.metadata import MetadataWorkflow