From c8f4d496a5ef69c74241d27e7f268e2a3bf56294 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Fri, 28 Oct 2022 16:11:23 +0200 Subject: [PATCH] Airflow exception management (#8410) * Airflow exception management * Airflow exception management --- .../ingestion/source/pipeline/airflow.py | 43 +++++++++++++------ 1 file changed, 29 insertions(+), 14 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airflow.py b/ingestion/src/metadata/ingestion/source/pipeline/airflow.py index 182679bd18d..e36e9819475 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airflow.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airflow.py @@ -18,7 +18,7 @@ from typing import Any, Iterable, List, Optional, cast from airflow.models import BaseOperator, DagRun, TaskInstance from airflow.models.serialized_dag import SerializedDagModel from airflow.serialization.serialized_objects import SerializedDAG -from pydantic import BaseModel +from pydantic import BaseModel, ValidationError from sqlalchemy.orm import Session from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest @@ -320,19 +320,34 @@ class AirflowSource(PipelineServiceSource): :return: Create Pipeline request with tasks """ - dag: SerializedDAG = self._build_dag(pipeline_details.data) - yield CreatePipelineRequest( - name=pipeline_details.dag_id, - description=dag.description, - pipelineUrl=f"/tree?dag_id={dag.dag_id}", # Just the suffix - concurrency=dag.concurrency, - pipelineLocation=pipeline_details.fileloc, - startDate=dag.start_date.isoformat() if dag.start_date else None, - tasks=self.get_tasks_from_dag(dag), - service=EntityReference( - id=self.context.pipeline_service.id.__root__, type="pipelineService" - ), - ) + try: + dag: SerializedDAG = self._build_dag(pipeline_details.data) + yield CreatePipelineRequest( + name=pipeline_details.dag_id, + description=dag.description, + pipelineUrl=f"/tree?dag_id={dag.dag_id}", # Just the suffix + concurrency=dag.concurrency, + pipelineLocation=pipeline_details.fileloc, + startDate=dag.start_date.isoformat() if dag.start_date else None, + tasks=self.get_tasks_from_dag(dag), + service=EntityReference( + id=self.context.pipeline_service.id.__root__, type="pipelineService" + ), + ) + except TypeError as err: + logger.debug(traceback.format_exc()) + logger.warning( + f"Error building DAG information from {pipeline_details}. There might be Airflow version" + f" incompatibilities - {err}" + ) + except ValidationError as err: + logger.debug(traceback.format_exc()) + logger.warning( + f"Error building pydantic model for {pipeline_details} - {err}" + ) + except Exception as err: + logger.debug(traceback.format_exc()) + logger.warning(f"Wild error ingesting pipeline {pipeline_details} - {err}") @staticmethod def parse_xlets(xlet: List[Any]) -> Optional[List[str]]: