From 6a1b1190a7a46a8064321a9a42c5ad630bc9075d Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Tue, 11 Jul 2023 07:35:22 +0200 Subject: [PATCH] Fix #12313 - Airflow lineage parsing and schedules (#12339) * Fix docs * Fix lineage and intervals * Format --- ingestion/src/metadata/cli/db_dump.py | 1 - .../dashboard/domodashboard/metadata.py | 2 +- .../source/pipeline/airflow/lineage_parser.py | 4 + .../source/pipeline/airflow/metadata.py | 148 ++----------- .../source/pipeline/airflow/models.py | 4 + .../source/pipeline/airflow/utils.py | 63 ++++++ .../unit/topology/pipeline/test_airflow.py | 200 ++++++++++++++++++ .../connectors/pipeline/airflow/mwaa.md | 3 - 8 files changed, 294 insertions(+), 131 deletions(-) create mode 100644 ingestion/src/metadata/ingestion/source/pipeline/airflow/utils.py create mode 100644 ingestion/tests/unit/topology/pipeline/test_airflow.py diff --git a/ingestion/src/metadata/cli/db_dump.py b/ingestion/src/metadata/cli/db_dump.py index f2d8bdd7d28..0339e7ade43 100644 --- a/ingestion/src/metadata/cli/db_dump.py +++ b/ingestion/src/metadata/cli/db_dump.py @@ -71,7 +71,6 @@ def clean_str(raw: str, engine: Engine) -> str: @singledispatch -# pylint: disable=line-too-long def clean_col(column_raw: Optional[Union[dict, str]], engine: Engine) -> str: return ( single_quote_wrap(clean_str(str(column_raw), engine)) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/domodashboard/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/domodashboard/metadata.py index 22ba4bf6ac8..45d27a6bd4f 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/domodashboard/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/domodashboard/metadata.py @@ -104,7 +104,7 @@ class DomodashboardSource(DashboardServiceSource): if user: return EntityReference(id=user.id.__root__, type="user") logger.warning( - f"No user for found for email {owner_details['email']} in OMD" + f"No user found with email [{owner_details['email']}] in OMD" ) except Exception as exc: logger.warning( diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airflow/lineage_parser.py b/ingestion/src/metadata/ingestion/source/pipeline/airflow/lineage_parser.py index 6f782fb0a0e..d999dc71371 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airflow/lineage_parser.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airflow/lineage_parser.py @@ -88,6 +88,10 @@ def parse_xlets(xlet: List[dict]) -> Optional[Dict[str, List[str]]]: Parse airflow xlets for V1 :param xlet: airflow v2 xlet dict :return: dictionary of xlet list or None + + [{'__var': {'tables': ['sample_data.ecommerce_db.shopify.fact_order']}, + '__type': 'dict'}] + """ # This branch is for lineage parser op if isinstance(xlet, list) and len(xlet) and isinstance(xlet[0], dict): diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py index 51b3460f583..741f307eafa 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py @@ -12,8 +12,8 @@ Airflow source to extract metadata from OM UI """ import traceback -from datetime import datetime, timedelta -from typing import Any, Iterable, List, Optional, cast +from datetime import datetime +from typing import Iterable, List, Optional, cast from airflow.models import BaseOperator, DagRun, TaskInstance from airflow.models.serialized_dag import SerializedDagModel @@ -49,10 +49,9 @@ from metadata.ingestion.source.pipeline.airflow.models import ( AirflowDag, AirflowDagDetails, ) +from metadata.ingestion.source.pipeline.airflow.utils import get_schedule_interval from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource -from metadata.utils.constants import TIMEDELTA from metadata.utils.helpers import clean_uri, datetime_to_ts -from metadata.utils.importer import import_from_module from metadata.utils.logger import ingestion_logger logger = ingestion_logger() @@ -64,21 +63,6 @@ STATUS_MAP = { } -class OMSerializedDagDetails(BaseModel): - """ - Custom model we get from the Airflow db - as a scoped SELECT from SerializedDagModel - """ - - dag_id: str - data: Any - fileloc: str - - # We don't have a validator for SerializedDag - class Config: - arbitrary_types_allowed = True - - class OMTaskInstance(BaseModel): """ Custom model we get from the Airflow db @@ -206,7 +190,7 @@ class AirflowSource(PipelineServiceSource): ] def yield_pipeline_status( - self, pipeline_details: SerializedDAG + self, pipeline_details: AirflowDagDetails ) -> OMetaPipelineStatus: try: dag_run_list = self.get_pipeline_status(pipeline_details.dag_id) @@ -251,39 +235,7 @@ class AirflowSource(PipelineServiceSource): " Skipping status ingestion." ) - def get_schedule_interval(self, pipeline_data) -> Optional[str]: - """ - Fetch Schedule Intervals from Airflow Dags - """ - schedule_interval_timetable_val = pipeline_data.get("timetable", {}).get( - "__var", {} - ) - if schedule_interval_timetable_val: - # Fetch Cron as String - return schedule_interval_timetable_val.get("expression", None) - schedule_interval_val = pipeline_data.get("schedule_interval", {}) - if schedule_interval_val: - type_value = schedule_interval_val.get("__type", {}) - if type_value == TIMEDELTA: - var_value = schedule_interval_val.get("__var", {}) - # types of schedule interval with timedelta - # timedelta(days=1) = `1 day, 0:00:00` - return str(timedelta(seconds=var_value)) - - try: - # If the Schedule interval is a const value like @once, @yearly etc - # __type sends the module path, and once instantiated - return import_from_module( - pipeline_data.get("timetable", {}).get("__type", {}) - )().summary - except Exception: - logger.debug(traceback.format_exc()) - logger.warning( - f"Couldn't fetch schedule interval for dag {pipeline_data.get('_dag_id')}" - ) - return None - - def get_pipelines_list(self) -> Iterable[OMSerializedDagDetails]: + def get_pipelines_list(self) -> Iterable[AirflowDagDetails]: """ List all DAGs from the metadata db. @@ -302,20 +254,21 @@ class AirflowSource(PipelineServiceSource): SerializedDagModel.fileloc, ).all(): try: - data = serialized_dag[1]["dag"] - dag = AirflowDagDetails( - dag_id=serialized_dag[0], - fileloc=serialized_dag[2], - data=AirflowDag(**serialized_dag[1]), - max_active_runs=data.get("max_active_runs", None), - description=data.get("_description", None), - start_date=data.get("start_date", None), - tasks=data.get("tasks", []), - schedule_interval=self.get_schedule_interval(data), - owners=self.fetch_owners(data), - ) + if serialized_dag[0] in ("example_subdag_operator"): + data = serialized_dag[1]["dag"] + dag = AirflowDagDetails( + dag_id=serialized_dag[0], + fileloc=serialized_dag[2], + data=AirflowDag.parse_obj(serialized_dag[1]), + max_active_runs=data.get("max_active_runs", None), + description=data.get("_description", None), + start_date=data.get("start_date", None), + tasks=data.get("tasks", []), + schedule_interval=get_schedule_interval(data), + owners=self.fetch_owners(data), + ) - yield dag + yield dag except ValidationError as err: logger.debug(traceback.format_exc()) logger.warning( @@ -343,7 +296,8 @@ class AirflowSource(PipelineServiceSource): def get_tasks_from_dag(dag: AirflowDagDetails, host_port: str) -> List[Task]: """ Obtain the tasks from a SerializedDAG - :param dag: SerializedDAG + :param dag: AirflowDagDetails + :param host_port: service host :return: List of tasks """ return [ @@ -363,18 +317,6 @@ class AirflowSource(PipelineServiceSource): for task in cast(Iterable[BaseOperator], dag.tasks) ] - @staticmethod - def _build_dag(data: Any) -> SerializedDAG: - """ - Use the queried data to fetch the DAG - :param data: from SQA query - :return: SerializedDAG - """ - if isinstance(data, dict): - return SerializedDAG.from_dict(data) - - return SerializedDAG.from_json(data) - def get_user_details(self, email) -> Optional[EntityReference]: user = self.metadata.get_user_by_email(email=email) if user: @@ -390,7 +332,7 @@ class AirflowSource(PipelineServiceSource): for owner in owners or []: return self.get_user_details(email=owner) - logger.debug(f"No user for found for email {owners} in OMD") + logger.debug(f"No user found with email [{owners}] in OMD") except Exception as exc: logger.warning(f"Error while getting details of user {owners} - {exc}") return None @@ -439,52 +381,6 @@ class AirflowSource(PipelineServiceSource): logger.debug(traceback.format_exc()) logger.warning(f"Wild error ingesting pipeline {pipeline_details} - {err}") - @staticmethod - def parse_xlets(xlet: Optional[List[Any]]) -> Optional[List[str]]: - """ - Parse airflow xlets for 2.1.4. E.g., - - [{'__var': {'tables': ['sample_data.ecommerce_db.shopify.fact_order']}, - '__type': 'dict'}] - - :param xlet: airflow v2 xlet dict - :return: table FQN list or None - """ - if xlet and len(xlet) and isinstance(xlet[0], dict): - tables = xlet[0].get("__var").get("tables") - if tables and isinstance(tables, list): - return tables - - return None - - def get_inlets(self, task: BaseOperator) -> Optional[List[str]]: - """ - Get inlets from serialised operator - :param task: SerializedBaseOperator - :return: maybe an inlet list - """ - inlets = task.get_inlet_defs() - try: - return self.parse_xlets(inlets) - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.warning(f"Error trying to parse inlets: {exc}") - return None - - def get_outlets(self, task: BaseOperator) -> Optional[List[str]]: - """ - Get outlets from serialised operator - :param task: SerializedBaseOperator - :return: maybe an inlet list - """ - outlets = task.get_outlet_defs() - try: - return self.parse_xlets(outlets) - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.warning(f"Error trying to parse outlets: {exc}") - return None - def yield_pipeline_lineage_details( self, pipeline_details: AirflowDagDetails ) -> Optional[Iterable[AddLineageRequest]]: diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airflow/models.py b/ingestion/src/metadata/ingestion/source/pipeline/airflow/models.py index 07596888e40..a9dae7fbd6e 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airflow/models.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airflow/models.py @@ -42,6 +42,10 @@ class Task(BaseModel): start_date: Optional[datetime] end_date: Optional[datetime] + # Allow picking up data from key `inlets` and `_inlets` + class Config: + allow_population_by_field_name = True + class TaskList(BaseModel): __root__: List[Task] diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airflow/utils.py b/ingestion/src/metadata/ingestion/source/pipeline/airflow/utils.py new file mode 100644 index 00000000000..6ea08d01787 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/pipeline/airflow/utils.py @@ -0,0 +1,63 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Airflow metadata utils +""" + +import traceback +from datetime import timedelta +from typing import Any, Dict, Optional + +from metadata.utils.constants import TIMEDELTA +from metadata.utils.importer import import_from_module +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() + + +def get_schedule_interval(pipeline_data: Dict[str, Any]) -> Optional[str]: + """ + Fetch Schedule Intervals from Airflow Dags + """ + try: + timetable, schedule = pipeline_data.get("timetable", {}), pipeline_data.get( + "schedule_interval", {} + ) + + if timetable: + # Fetch Cron as String + expression = timetable.get("__var", {}).get("expression") + if expression: + return expression + + expression_class = timetable.get("__type") + if expression_class: + return import_from_module(expression_class)().summary + + if schedule: + if isinstance(schedule, str): + return schedule + type_value = schedule.get("__type") + if type_value == TIMEDELTA: + var_value = schedule.get("__var", {}) + # types of schedule interval with timedelta + # timedelta(days=1) = `1 day, 0:00:00` + return str(timedelta(seconds=var_value)) + + # If no timetable nor schedule, the DAG has no interval set + return None + + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning( + f"Couldn't fetch schedule interval for dag {pipeline_data.get('_dag_id'): [{exc}]}" + ) + return None diff --git a/ingestion/tests/unit/topology/pipeline/test_airflow.py b/ingestion/tests/unit/topology/pipeline/test_airflow.py new file mode 100644 index 00000000000..bd5f402b404 --- /dev/null +++ b/ingestion/tests/unit/topology/pipeline/test_airflow.py @@ -0,0 +1,200 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Test Airflow processing +""" +from unittest import TestCase + +from metadata.ingestion.source.pipeline.airflow.models import ( + AirflowDag, + AirflowDagDetails, +) +from metadata.ingestion.source.pipeline.airflow.utils import get_schedule_interval + + +class TestAirflow(TestCase): + """ + Test Airflow model processing + """ + + def test_parsing(self): + """ + We can properly pick up Airflow's payload and convert + it to our models + """ + + serialized_dag = { + "__version": 1, + "dag": { + "_dag_id": "test-lineage-253", + "fileloc": "/opt/airflow/dags/lineage-test.py", + "default_args": { + "__var": { + "owner": "airflow", + "depends_on_past": False, + "email": ["airflow@example.com"], + "email_on_failure": False, + "email_on_retry": False, + "retries": 1, + "retry_delay": {"__var": 1, "__type": "timedelta"}, + }, + "__type": "dict", + }, + "timezone": "UTC", + "catchup": False, + "edge_info": {}, + "dataset_triggers": [], + "_description": "An example DAG which simulate dbt run of fct_application_summary for airflow lineage backend", + "_task_group": { + "_group_id": None, + "prefix_group_id": True, + "tooltip": "", + "ui_color": "CornflowerBlue", + "ui_fgcolor": "#000", + "children": { + "task0": ["operator", "task0"], + "task1": ["operator", "task1"], + }, + "upstream_group_ids": [], + "downstream_group_ids": [], + "upstream_task_ids": [], + "downstream_task_ids": [], + }, + "is_paused_upon_creation": False, + "start_date": 1688860800, + "schedule_interval": None, + "_processor_dags_folder": "/opt/airflow/dags", + "tasks": [ + { + "owner": "airflow", + "retry_delay": 1, + "retries": 1, + "ui_color": "#e8f7e4", + "email": ["airflow@example.com"], + "task_id": "task0", + "email_on_failure": False, + "email_on_retry": False, + "pool": "default_pool", + "downstream_task_ids": ["task1"], + "template_ext": [], + "template_fields_renderers": {}, + "inlets": [ + { + "__var": { + "tables": [ + "sample_data.ecommerce_db.shopify.dim_location" + ] + }, + "__type": "dict", + } + ], + "template_fields": [], + "ui_fgcolor": "#000", + "_task_type": "EmptyOperator", + "_task_module": "airflow.operators.empty", + "_is_empty": True, + }, + { + "outlets": [ + { + "__var": { + "tables": [ + "sample_data.ecommerce_db.shopify.dim_staff" + ] + }, + "__type": "dict", + } + ], + "owner": "airflow", + "retry_delay": 1, + "retries": 1, + "ui_color": "#e8f7e4", + "email": ["airflow@example.com"], + "task_id": "task1", + "email_on_failure": False, + "email_on_retry": False, + "pool": "default_pool", + "downstream_task_ids": [], + "template_ext": [], + "template_fields_renderers": {}, + "template_fields": [], + "ui_fgcolor": "#000", + "_task_type": "EmptyOperator", + "_task_module": "airflow.operators.empty", + "_is_empty": True, + }, + ], + "dag_dependencies": [], + "params": {}, + }, + } + + data = serialized_dag["dag"] + + dag = AirflowDagDetails( + dag_id="id", + fileloc="loc", + data=AirflowDag.parse_obj(serialized_dag), + max_active_runs=data.get("max_active_runs", None), + description=data.get("_description", None), + start_date=data.get("start_date", None), + tasks=data.get("tasks", []), + schedule_interval=None, + owners=None, + ) + + self.assertEquals( + dag.tasks[0].inlets, + [ + { + "__var": { + "tables": ["sample_data.ecommerce_db.shopify.dim_location"] + }, + "__type": "dict", + } + ], + ) + self.assertEquals( + dag.tasks[1].outlets, + [ + { + "__var": {"tables": ["sample_data.ecommerce_db.shopify.dim_staff"]}, + "__type": "dict", + } + ], + ) + + def test_get_schedule_interval(self): + """ + Check the shape of different DAGs + """ + + pipeline_data = {"schedule_interval": None} + self.assertEquals(get_schedule_interval(pipeline_data), None) + + pipeline_data = {"schedule_interval": {"__var": 86400.0, "__type": "timedelta"}} + self.assertEquals(get_schedule_interval(pipeline_data), "1 day, 0:00:00") + + pipeline_data = { + "timetable": { + "__type": "airflow.timetables.simple.OnceTimetable", + "__var": {}, + } + } + self.assertEquals(get_schedule_interval(pipeline_data), "@once") + + pipeline_data = { + "timetable": { + "__type": "airflow.timetables.interval.CronDataIntervalTimetable", + "__var": {"expression": "*/2 * * * *", "timezone": "UTC"}, + } + } + self.assertEquals(get_schedule_interval(pipeline_data), "*/2 * * * *") diff --git a/openmetadata-docs/content/v1.1.0/connectors/pipeline/airflow/mwaa.md b/openmetadata-docs/content/v1.1.0/connectors/pipeline/airflow/mwaa.md index 7202f9ea158..9ae90e6f73c 100644 --- a/openmetadata-docs/content/v1.1.0/connectors/pipeline/airflow/mwaa.md +++ b/openmetadata-docs/content/v1.1.0/connectors/pipeline/airflow/mwaa.md @@ -579,11 +579,8 @@ For Airflow providers, you will want to pull the provider versions from [the mat Also note that the ingestion workflow function must be entirely self contained as it will run by itself in the virtualenv. Any imports it needs, including the configuration, must exist within the function itself. -<<<<<<<< HEAD:openmetadata-docs/content/v1.1.0/connectors/ingestion/deployment/running-from-mwaa.md ### Extracting MWAA Metadata -======== ->>>>>>>> docs-sso-airflow:openmetadata-docs/content/v1.1.0/connectors/pipeline/airflow/mwaa.md As the ingestion process will be happening locally in MWAA, we can prepare a DAG with the following YAML configuration: