Airflow exception management (#8410)

* Airflow exception management

* Airflow exception management
This commit is contained in:
Pere Miquel Brull 2022-10-28 16:11:23 +02:00 committed by GitHub
parent f027c26cc1
commit c8f4d496a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -18,7 +18,7 @@ from typing import Any, Iterable, List, Optional, cast
from airflow.models import BaseOperator, DagRun, TaskInstance
from airflow.models.serialized_dag import SerializedDagModel
from airflow.serialization.serialized_objects import SerializedDAG
from pydantic import BaseModel
from pydantic import BaseModel, ValidationError
from sqlalchemy.orm import Session
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
@ -320,19 +320,34 @@ class AirflowSource(PipelineServiceSource):
:return: Create Pipeline request with tasks
"""
dag: SerializedDAG = self._build_dag(pipeline_details.data)
yield CreatePipelineRequest(
name=pipeline_details.dag_id,
description=dag.description,
pipelineUrl=f"/tree?dag_id={dag.dag_id}", # Just the suffix
concurrency=dag.concurrency,
pipelineLocation=pipeline_details.fileloc,
startDate=dag.start_date.isoformat() if dag.start_date else None,
tasks=self.get_tasks_from_dag(dag),
service=EntityReference(
id=self.context.pipeline_service.id.__root__, type="pipelineService"
),
)
try:
dag: SerializedDAG = self._build_dag(pipeline_details.data)
yield CreatePipelineRequest(
name=pipeline_details.dag_id,
description=dag.description,
pipelineUrl=f"/tree?dag_id={dag.dag_id}", # Just the suffix
concurrency=dag.concurrency,
pipelineLocation=pipeline_details.fileloc,
startDate=dag.start_date.isoformat() if dag.start_date else None,
tasks=self.get_tasks_from_dag(dag),
service=EntityReference(
id=self.context.pipeline_service.id.__root__, type="pipelineService"
),
)
except TypeError as err:
logger.debug(traceback.format_exc())
logger.warning(
f"Error building DAG information from {pipeline_details}. There might be Airflow version"
f" incompatibilities - {err}"
)
except ValidationError as err:
logger.debug(traceback.format_exc())
logger.warning(
f"Error building pydantic model for {pipeline_details} - {err}"
)
except Exception as err:
logger.debug(traceback.format_exc())
logger.warning(f"Wild error ingesting pipeline {pipeline_details} - {err}")
@staticmethod
def parse_xlets(xlet: List[Any]) -> Optional[List[str]]: