Where `x.y.z` is the version of the OpenMetadata ingestion package. Note that the version needs to match the server version. If we are using the server at 1.1.0, then the ingestion package needs to also be 1.1.0.
The plugin parameter is a list of the sources that we want to ingest. An example would look like this `openmetadata-ingestion[mysql,snowflake,s3]==1.1.0`.
### Example
A DAG deployed using a Python Operator would then look like follows
For example, preparing a metadata ingestion DAG with this operator will look as follows:
```python
import yaml
from datetime import timedelta
from airflow import DAG
try:
from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
from airflow.operators.python_operator import PythonOperator
from metadata.config.common import load_config_file
from metadata.workflow.metadata import MetadataWorkflow
docker_url="unix://var/run/docker.sock", # To allow to start Docker. Needs chmod 666 permissions
tty=True,
auto_remove="True",
network_mode="host", # To reach the OM server
task_id="ingest",
dag=dag,
)
```
{% note %}
Make sure to tune out the DAG configurations (`schedule_interval`, `start_date`, etc.) as your use case requires.
{% /note %}
Note that the example uses the image `openmetadata/ingestion-base:0.13.2`. Update that accordingly for higher version
once they are released. Also, the image version should be aligned with your OpenMetadata server version to avoid
incompatibilities.
Another important point here is making sure that the Airflow will be able to run Docker commands to create the task.
As our example was done with Airflow in Docker Compose, that meant setting `docker_url="unix://var/run/docker.sock"`.
The final important elements here are:
-`command="python main.py"`: This does not need to be modified, as we are shipping the `main.py` script in the
image, used to trigger the workflow.
-`environment={"config": config, "pipelineType": "metadata"}`: Again, in most cases you will just need to update
the `config` string to point to the right connector.
Other supported values of `pipelineType` are `usage`, `lineage`, `profiler`, `dataInsight`, `elasticSearchReindex`, `dbt`, `application` or `TestSuite`. Pass the required flag
depending on the type of workflow you want to execute. Make sure that the YAML config reflects what ingredients
are required for your Workflow.
## Python Virtualenv Operator
You can use the [PythonVirtualenvOperator](https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html#pythonvirtualenvoperator)
when working with an Airflow installation where:
1. You don't want to install dependencies directly on your Airflow host,
2. You don't have any Docker runtime,
3. Your Airflow's Python version is not supported by `openmetadata-ingestion`.
### Prerequisites
As stated in Airflow's [docs](https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html#pythonvirtualenvoperator),
your Airflow host should have the `virtualenv` package installed.
Moreover, if you're planning to use a different Python Version in the `virtualenv` than the one your Airflow uses,
you will need that version to be installed in the Airflow host.
For example, if we use Airflow running with Python 3.7 but want the `virtualenv` to use Python 3.9, we need to install
in the host the following packages: `gcc python3.9-dev python3.9-distutils`.
### Example
In this example, we will be using a different Python version that the one Airflow is running:
```python
from datetime import timedelta
from airflow import DAG
try:
from airflow.operators.python import PythonVirtualenvOperator
except ModuleNotFoundError:
from airflow.operators.python_operator import PythonVirtualenvOperator
from airflow.utils.dates import days_ago
default_args = {
"owner": "user_name",
"email": ["username@org.com"],
"email_on_failure": False,
"retries": 3,
"retry_delay": timedelta(seconds=10),
"execution_timeout": timedelta(minutes=60),
}
def metadata_ingestion_workflow():
from metadata.workflow.metadata import MetadataWorkflow