
* wip * feat: trigger external apps with override config - Added in openmetadata-airflow-apis functionality to trigger DAG with feature. - Modified openmetadata-airflow-apis application runner to accept override config from params. - Added overloaded runPipeline with `Map<String,Object> config` to allow triggering apps with configuration. We might want to expand this to all ingestion pipelines. For now its just for apps. - Implemented an example external app that can be used to test functionality of external apps. The app can be enabled by setting the `ENABLE_APP_HelloPipelines=true` environment variable. * fix class doc for application * fixed README for airflow apis * fixes * set HelloPipelines to disabeld by default * fixed basedpywright errros * fixed app schema * reduced airflow client runPipeline to an overload with null config removed duplicate call to runPipeline in AppResource * Update openmetadata-docs/content/v1.7.x-SNAPSHOT/developers/applications/index.md Co-authored-by: Matias Puerta <matias@getcollate.io> * deleted documentation file --------- Co-authored-by: Matias Puerta <matias@getcollate.io>
OpenMetadata Airflow Managed DAGS Api
This is a plugin for Apache Airflow >= 1.10 and Airflow >=2.x that exposes REST APIs to deploy an OpenMetadata workflow definition and manage DAGS and tasks.
Development
The file development/airflow/airflow.cfg
contains configuration which runs based on
the airflow server deployed by the quick-start and development compose files.
You ca run the following command to start the development environment:
export AIRFLOW_HOME=$(pwd)/openmetadata-airflow-managed-api/development/airflow
airflow webserver
Requirements
First, make sure that Airflow is properly installed with the latest version 2.3.3
. From
the docs:
Then, install following packages in your scheduler and webserver python env.
pip install openmetadata-airflow-managed-apis
Configuration
Add the following section to airflow.cfg
[openmetadata_airflow_apis]
dag_generated_configs = {AIRFLOW_HOME}/dag_generated_configs
substitute AIRFLOW_HOME with your airflow installation home
Deploy
pip install "apache-airflow==2.3.3" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.3.3/constraints-3.9.txt"
-
Install the package
-
mkdir -p {AIRFLOW_HOME}/dag_generated_configs
-
(re)start the airflow webserver and scheduler
airflow webserver airflow scheduler
Validate
You can check that the plugin is correctly loaded by going to http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/restapi
,
or accessing the REST_API_PLUGIN view through the Admin dropdown.
APIs
Enable JWT Auth tokens
Plugin enables JWT Token based authentication for Airflow versions 1.10.4 or higher when RBAC support is enabled.
Generating the JWT access token
curl -XPOST http://localhost:8080/api/v1/security/login -H "Content-Type: application/json" -d '{"username":"admin", "password":"admin", "refresh":true, "provider": "db"}'
Examples:
curl -X POST http://localhost:8080/api/v1/security/login -H "Content-Type: application/json" -d '{"username":"admin", "password":"admin", "refresh":true, "provider": "db"}'
Sample response which includes access_token and refresh_token.
{
"access_token":"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE2MDQyMTc4MzgsIm5iZiI6MTYwNDIxNzgzOCwianRpIjoiMTI4ZDE2OGQtMTZiOC00NzU0LWJiY2EtMTEyN2E2ZTNmZWRlIiwiZXhwIjoxNjA0MjE4NzM4LCJpZGVudGl0eSI6MSwiZnJlc2giOnRydWUsInR5cGUiOiJhY2Nlc3MifQ.xSWIE4lR-_0Qcu58OiSy-X0XBxuCd_59ic-9TB7cP9Y",
"refresh_token":"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE2MDQyMTc4MzgsIm5iZiI6MTYwNDIxNzgzOCwianRpIjoiZjA5NTNkODEtNWY4Ni00YjY0LThkMzAtYzg5NTYzMmFkMTkyIiwiZXhwIjoxNjA2ODA5ODM4LCJpZGVudGl0eSI6MSwidHlwZSI6InJlZnJlc2gifQ.VsiRr8_ulCoQ-3eAbcFz4dQm-y6732QR6OmYXsy4HLk"
}
By default, JWT access token is valid for 15 mins and refresh token is valid for 30 days. You can renew the access token with the help of refresh token as shown below.
Renewing the Access Token
curl -X POST "http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/security/refresh" -H 'Authorization: Bearer <refresh_token>'
Examples:
curl -X POST "http://localhost:8080/api/v1/security/refresh" -H 'Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE2MDQyMTc4MzgsIm5iZiI6MTYwNDIxNzgzOCwianRpIjoiZjA5NTNkODEtNWY4Ni00YjY0LThkMzAtYzg5NTYzMmFkMTkyIiwiZXhwIjoxNjA2ODA5ODM4LCJpZGVudGl0eSI6MSwidHlwZSI6InJlZnJlc2gifQ.VsiRr8_ulCoQ-3eAbcFz4dQm-y6732QR6OmYXsy4HLk'
sample response returns the renewed access token as shown below.
{
"access_token":"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE2MDQyODQ2OTksIm5iZiI6MTYwNDI4NDY5OSwianRpIjoiZDhhN2IzMmYtMWE5Zi00Y2E5LWFhM2ItNDEwMmU3ZmMyMzliIiwiZXhwIjoxNjA0Mjg1NTk5LCJpZGVudGl0eSI6MSwiZnJlc2giOmZhbHNlLCJ0eXBlIjoiYWNjZXNzIn0.qY2e-bNSgOY-YboinOoGqLfKX9aQkdRjo025mZwBadA"
}
Enable API requests with JWT
If the Authorization header is not added in the api request,response error:
{"msg":"Missing Authorization Header"}
Pass the additional Authorization:Bearer <access_token> header in the rest API request.
Examples:
curl -X GET -H 'Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE2MDQyODQ2OTksIm5iZiI6MTYwNDI4NDY5OSwianRpIjoiZDhhN2IzMmYtMWE5Zi00Y2E5LWFhM2ItNDEwMmU3ZmMyMzliIiwiZXhwIjoxNjA0Mjg1NTk5LCJpZGVudGl0eSI6MSwiZnJlc2giOmZhbHNlLCJ0eXBlIjoiYWNjZXNzIn0.qY2e-bNSgOY-YboinOoGqLfKX9aQkdRjo025mZwBadA' http://localhost:8080/rest_api/api\?api\=dag_state\&dag_id\=dag_test\&run_id\=manual__2020-10-28T17%3A36%3A28.838356%2B00%3A00
Using the API
Once you deploy the plugin and restart the webserver, you can start to use the REST API. Bellow you will see the endpoints that are supported.
Note: If enable RBAC, http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/rest_api/
This web page will show the Endpoints supported and provide a form for you to test submitting to them.
deploy_dag
Description:
- Deploy a new dag, and refresh dag to session.
Endpoint:
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/rest_api/api?api=deploy_dag
Method:
- POST
POST request Arguments:
{
"workflow": {
"name": "test_ingestion_x_35",
"force": "true",
"pause": "false",
"unpause": "true",
"dag_config": {
"test_ingestion_x_35": {
"default_args": {
"owner": "harsha",
"start_date": "2021-10-29T00:00:00.000Z",
"end_date": "2021-11-05T00:00:00.000Z",
"retries": 1,
"retry_delay_sec": 300
},
"schedule_interval": "0 3 * * *",
"concurrency": 1,
"max_active_runs": 1,
"dagrun_timeout_sec": 60,
"default_view": "tree",
"orientation": "LR",
"description": "this is an example dag!",
"tasks": {
"task_1": {
"operator": "airflow.operators.python_operator.PythonOperator",
"python_callable_name": "metadata_ingestion_workflow",
"python_callable_file": "metadata_ingestion.py",
"op_kwargs": {
"workflow_config": {
"metadata_server": {
"config": {
"api_endpoint": "http://localhost:8585/api",
"auth_provider_type": "no-auth"
},
"type": "metadata-server"
},
"sink": {
"config": {
"es_host": "localhost",
"es_port": 9200,
"index_dashboards": "true",
"index_tables": "true",
"index_topics": "true"
},
"type": "elasticsearch"
},
"source": {
"config": {
"include_dashboards": "true",
"include_tables": "true",
"include_topics": "true",
"limit_records": 10
},
"type": "metadata"
}
}
}
}
}
}
}
}
}
Examples:
curl -H 'Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE2MzU2NTE1MDAsIm5iZiI6MTYzNTY1MTUwMCwianRpIjoiNWQyZTM3ZDYtNjdiYS00NGZmLThjOWYtMDM0ZTQyNGE3MTZiIiwiZXhwIjoxNjM1NjUyNDAwLCJpZGVudGl0eSI6MSwiZnJlc2giOnRydWUsInR5cGUiOiJhY2Nlc3MifQ.DRUYCAiMh5h2pk1MZZJ4asyVFC20pu35DuAANQ5GxGw' -H 'Content-Type: application/json' -d "@test_ingestion_config.json" -X POST http://localhost:8080/rest_api/api\?api\=deploy_dag```
##### response:
```json
{"message": "Workflow [test_ingestion_x_35] has been created", "status": "success"}
delete_dag
Description:
- Delete dag based on dag_id.
Endpoint:
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/rest_api/api?api=delete_dag&dag_id=value
Method:
- GET
GET request Arguments:
- dag_id - string - The id of dag.
Examples:
curl -X GET http://localhost:8080/rest_api/api?api=delete_dag&dag_id=dag_test
response:
{
"message": "DAG [dag_test] deleted",
"status": "success"
}