- We need to install the [openmetadata-ingestion](https://pypi.org/project/openmetadata-ingestion/) package in the MWAA environment
- The installation can clash with existing libraries
- Upgrading the OM version will require to repeat the installation process
To install the package, we need to update the `requirements.txt` file from the MWAA environment to add the following line:
```
openmetadata-ingestion[<plugin>]==x.y.z
```
Where `x.y.z` is the version of the OpenMetadata ingestion package. Note that the version needs to match the server version. If we are using the server at 0.12.2, then the ingestion package needs to also be 0.12.2.
The plugin parameter is a list of the sources that we want to ingest. An example would look like this `openmetadata-ingestion[mysql,snowflake,s3]==0.12.2.2`.
A DAG deployed using a Python Operator would then look like follows
```python
import json
from datetime import timedelta
from airflow import DAG
try:
from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from metadata.ingestion.api.workflow import Workflow
default_args = {
"retries": 3,
"retry_delay": timedelta(seconds=10),
"execution_timeout": timedelta(minutes=60),
}
config = """
YAML config
"""
def metadata_ingestion_workflow():
workflow_config = json.loads(config)
workflow = Workflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
with DAG(
"redshift_ingestion",
default_args=default_args,
description="An example DAG which runs a OpenMetadata ingestion workflow",
start_date=days_ago(1),
is_paused_upon_creation=False,
catchup=False,
) as dag:
ingest_task = PythonOperator(
task_id="ingest_redshift",
python_callable=metadata_ingestion_workflow,
)
```
Where you can update the YAML configuration and workflow classes accordingly. accordingly. Further examples on how to
run the ingestion can be found on the documentation (e.g., [Snowflake](https://docs.open-metadata.org/connectors/database/snowflake)).
### Extracting MWAA Metadata
As the ingestion process will be happening locally in MWAA, we can prepare a DAG with the following YAML
configuration:
```yaml
source:
type: airflow
serviceName: airflow_mwaa
serviceConnection:
config:
type: Airflow
hostPort: http://localhost:8080
numberOfStatus: 10
connection:
type: Backend
sourceConfig:
config:
type: PipelineMetadata
sink:
type: metadata-rest
config: {}
workflowConfig:
loggerLevel: INFO
openMetadataServerConfig:
hostPort: <OpenMetadatahostandport>
authProvider: <OpenMetadataauthprovider>
```
Using the connection type `Backend` will pick up the Airflow database session directly at runtime.
## Ingestion Workflows as an ECS Operator
### PROs
- Completely isolated environment
- Easy to update each version
### CONs
- We need to set up an ECS cluster and the required policies in MWAA to connect to ECS and handle Log Groups.
We will now describe the steps, following the official AWS documentation.
Note that all this process has been extracted from the official AWS [docs](https://docs.aws.amazon.com/mwaa/latest/userguide/samples-ecs-operator.html).
- The required image is `docker.getcollate.io/openmetadata/ingestion-base:x.y.z`
- The same logic as above applies. The `x.y.z` version needs to match the server version. For example, `docker.getcollate.io/openmetadata/ingestion-base:0.13.2`
<Imagesrc="/images/openmetadata/connectors/deployment/create-ecs-service-networking.png"alt="Create ECS Service Networking"/>
### 2. Update MWAA Executor Role policies
Identify your MWAA executor role. This can be obtained from the details view of your MWAA environment.
First, add the following two policies to the role: [AmazonECS_FullAccess](https://us-east-1.console.aws.amazon.com/iam/home#/policies/arn:aws:iam::aws:policy/AmazonECS_FullAccess$jsonEditor).
Note that depending on the kind of workflow you will be deploying, the YAML configuration will need to updated following
the official OpenMetadata docs, and the value of the `pipelineType` configuration will need to hold one of the following values:
-`metadata`
-`usage`
-`lineage`
-`profiler`
-`TestSuite`
Which are based on the `PipelineType` [JSON Schema definitions](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/entity/services/ingestionPipelines/ingestionPipeline.json#L14)
### Extracting MWAA Metadata
To extract MWAA information we will need to take a couple of points in consideration:
1. How to get the underlying database connection info, and
2. How to make sure we can reach such database.
#### Getting the DB connection
The happy path would be going to the `Airflow UI > Admin > Configurations` and finding the `sql_alchemy_conn` parameter.
However, MWAA is not providing this information. Instead, we need to create a DAG to get the connection details
once. The DAG can be deleted afterwards. We want to use a Python Operator that will retrieve the Airflow's Session data:
```python
import logging
import os
from datetime import timedelta
import yaml
from airflow import DAG
try:
from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
from airflow.operators.python_operator import PythonOperator