Fix #4828 - Remove lineage backend from airflow image (#4833)

* Remove callbacks from DAG

* Remove lineage backend from image
This commit is contained in:
Pere Miquel Brull 2022-05-11 07:36:33 +02:00 committed by GitHub
parent 79b10081d7
commit 23b95a267b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 1076 additions and 20 deletions

View File

@ -31,7 +31,7 @@ RUN pip install ".[${INGESTION_DEPENDENCY}]"
RUN airflow db init RUN airflow db init
RUN cp -r /ingestion/examples/airflow/airflow.cfg /airflow/airflow.cfg RUN cp -r /ingestion/airflow.cfg /airflow/airflow.cfg
RUN cp -r /om-airflow/plugins /airflow/plugins RUN cp -r /om-airflow/plugins /airflow/plugins
RUN cp -r /om-airflow/plugins/dag_templates /airflow/ RUN cp -r /om-airflow/plugins/dag_templates /airflow/
RUN mkdir -p /airflow/dag_generated_configs RUN mkdir -p /airflow/dag_generated_configs

1074
ingestion/airflow.cfg Normal file

File diff suppressed because it is too large Load Diff

View File

@ -13,7 +13,7 @@ Metadata DAG common functions
""" """
import json import json
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Any, Callable, Dict, Optional from typing import Callable, Optional
from airflow import DAG from airflow import DAG
@ -27,10 +27,6 @@ try:
except ModuleNotFoundError: except ModuleNotFoundError:
from airflow.operators.python_operator import PythonOperator from airflow.operators.python_operator import PythonOperator
from airflow_provider_openmetadata.lineage.callback import (
failure_callback,
success_callback,
)
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
IngestionPipeline, IngestionPipeline,
) )
@ -109,19 +105,6 @@ def build_workflow_config_property(
) )
def build_default_args() -> Dict[str, Any]:
"""
Build the default_args dict to be passed
to the DAG regardless of the airflow_pipeline
payload.
"""
return {
# Run the lineage backend callbacks to gather the Pipeline info
"on_failure_callback": failure_callback,
"on_success_callback": success_callback,
}
def build_dag_configs(ingestion_pipeline: IngestionPipeline) -> dict: def build_dag_configs(ingestion_pipeline: IngestionPipeline) -> dict:
""" """
Prepare kwargs to send to DAG Prepare kwargs to send to DAG
@ -131,7 +114,6 @@ def build_dag_configs(ingestion_pipeline: IngestionPipeline) -> dict:
return { return {
"dag_id": ingestion_pipeline.name.__root__, "dag_id": ingestion_pipeline.name.__root__,
"description": ingestion_pipeline.description, "description": ingestion_pipeline.description,
"default_args": build_default_args(),
"start_date": date_to_datetime(ingestion_pipeline.airflowConfig.startDate), "start_date": date_to_datetime(ingestion_pipeline.airflowConfig.startDate),
"end_date": date_to_datetime(ingestion_pipeline.airflowConfig.endDate), "end_date": date_to_datetime(ingestion_pipeline.airflowConfig.endDate),
"concurrency": ingestion_pipeline.airflowConfig.concurrency, "concurrency": ingestion_pipeline.airflowConfig.concurrency,