[Docs] - MWAA ECS Operator updates (#13815)

This commit is contained in:
Pere Miquel Brull 2023-11-02 07:50:54 +01:00 committed by GitHub
parent 306c8ecfbe
commit 6fd71be54c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -129,11 +129,25 @@ For example, if in the JSON from the Task Definition we see:
We'll need to use the `/ecs/openmetadata` below when configuring the policies.
### 2. Create a Service
### 2. Task Definition ARN & Networking
In the created cluster, we'll create a service to run the task. We will use:
- Compute option: `Launch Type` with `FARGATE`
- Application of type `Service`, where we'll select the Task Definition created above.
1. From the AWS Console, copy your task definition ARN. It will look something like this `arn:aws:ecs:<region>:<account>:task-definition/<name>:<revision>`.
2. Get the network details on where the task should execute. We will be using a JSON like:
```json
{
"awsvpcConfiguration": {
"subnets": [
"subnet-xxxyyyzzz",
"subnet-xxxyyyzzz"
],
"securityGroups": [
"sg-xxxyyyzzz"
],
"assignPublicIp": "ENABLED"
}
}
```
{% note %}
@ -156,9 +170,7 @@ be in the same network environment as MWAA to reach the underlying database.
"Effect": "Allow",
"Action": [
"ecs:RunTask",
"ecs:DescribeTasks",
"ecs:ListServices",
"ecs:DescribeServices"
"ecs:DescribeTasks"
],
"Resource": "*"
},
@ -203,30 +215,37 @@ And for the Log Group permissions
Note how you need to replace the `region`, `account-id` and the `log group` names for your Airflow Environment and ECS.
Moreover, we will need the name we gave to the ECS Cluster, as well as the container name of the Service.
### 4. Prepare the DAG
A DAG created using the ECS Operator will then look like this:
```python
from datetime import datetime
import yaml
from airflow import DAG
from http import client
from airflow import DAG
# If using Airflow < 2.5
# from airflow.providers.amazon.aws.operators.ecs import ECSOperator
# If using Airflow > 2.5
from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator
from airflow.utils.dates import days_ago
import boto3
CLUSTER_NAME="openmetadata-ingestion-cluster" # Replace value for CLUSTER_NAME with your information.
CLUSTER_NAME="openmetadata-ingestion" # Replace value for CLUSTER_NAME with your information.
CONTAINER_NAME="openmetadata-ingestion" # Replace value for CONTAINER_NAME with your information.
LAUNCH_TYPE="FARGATE"
TASK_DEFINITION = "arn:aws:ecs:<region>:<account>:task-definition/<name>:<revision>"
NETWORK_CONFIG = {
"awsvpcConfiguration": {
"subnets": [
"subnet-xxxyyyzzz",
"subnet-xxxyyyzzz"
],
"securityGroups": [
"sg-xxxyyyzzz"
],
"assignPublicIp": "ENABLED"
}
}
config = """
YAML config
"""
@ -239,14 +258,11 @@ with DAG(
start_date=days_ago(1),
is_paused_upon_creation=True,
) as dag:
client=boto3.client('ecs')
services=client.list_services(cluster=CLUSTER_NAME,launchType=LAUNCH_TYPE)
service=client.describe_services(cluster=CLUSTER_NAME,services=services['serviceArns'])
ecs_operator_task = EcsRunTaskOperator(
task_id = "ecs_ingestion_task",
dag=dag,
cluster=CLUSTER_NAME,
task_definition=service['services'][0]['taskDefinition'],
task_definition=TASK_DEFINITION,
launch_type=LAUNCH_TYPE,
overrides={
"containerOverrides":[
@ -267,7 +283,7 @@ with DAG(
],
},
network_configuration=service['services'][0]['networkConfiguration'],
network_configuration=NETWORK_CONFIG,
awslogs_group="/ecs/ingest",
awslogs_stream_prefix=f"ecs/{CONTAINER_NAME}",
)