dushayntAW 0e698173fe
fix(ingestion/airflow-plugin): fixed missing inlet/outlets (#11101)
Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
2024-08-18 20:30:27 -04:00

75 lines
1.9 KiB
Python

import logging
from datetime import datetime, timedelta
from typing import Any, List, Tuple
from airflow import DAG
from airflow.models.baseoperator import BaseOperator
from datahub_airflow_plugin.entities import Dataset
logger = logging.getLogger(__name__)
class CustomOperator(BaseOperator):
def __init__(self, name, **kwargs):
super().__init__(**kwargs)
self.name = name
def execute(self, context):
"""
Other code....
"""
logger.info("executing other code here")
input_tables = ["mydb.schema.tableA", "mydb.schema.tableB"]
output_tables = ["mydb.schema.tableD"]
inlets, outlets = self._get_sf_lineage(input_tables, output_tables)
context["ti"].task.inlets = inlets
context["ti"].task.outlets = outlets
@staticmethod
def _get_sf_lineage(
input_tables: List[str], output_tables: List[str]
) -> Tuple[List[Any], List[Any]]:
"""
Get lineage tables from Snowflake.
"""
inlets: List[Dataset] = []
outlets: List[Dataset] = []
for table in input_tables:
inlets.append(Dataset(platform="snowflake", name=table))
for table in output_tables:
outlets.append(Dataset(platform="snowflake", name=table))
return inlets, outlets
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2023, 1, 1),
"email": ["jdoe@example.com"],
"email_on_failure": False,
"execution_timeout": timedelta(minutes=5),
}
with DAG(
"custom_operator_dag",
default_args=default_args,
description="An example dag with custom operator",
schedule_interval=None,
tags=["example_tag"],
catchup=False,
default_view="tree",
) as dag:
custom_task = CustomOperator(
task_id="custom_task_id",
name="custom_name",
dag=dag,
)