Fix #4257 - Missing airflow params (#4259)

Fix #4257 - Missing airflow params (#4259)
This commit is contained in:
Pere Miquel Brull 2022-04-20 09:14:27 +02:00 committed by GitHub
parent d30619a81f
commit 098cb865d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 39 additions and 33 deletions

View File

@ -63,10 +63,6 @@
"description": "End Date of the pipeline.",
"$ref": "../../../type/basic.json#/definitions/date"
},
"nextExecutionDate": {
"description": "Next execution date from the underlying pipeline platform once the pipeline scheduled.",
"$ref": "../../../type/basic.json#/definitions/date"
},
"pipelineTimezone": {
"description": "Timezone in which pipeline going to be scheduled.",
"type": "string",
@ -91,11 +87,6 @@
"description": "Scheduler Interval for the pipeline in cron format.",
"type": "string"
},
"pipelineTimeout": {
"description": "Timeout for the pipeline in seconds.",
"type": "integer",
"default": 60
},
"maxActiveRuns": {
"description": "Maximum Number of active runs.",
"type": "integer",
@ -104,7 +95,7 @@
"workflowTimeout": {
"description": "Timeout for the workflow in seconds.",
"type": "integer",
"default": 60
"default": null
},
"workflowDefaultView": {
"description": "Default view in Airflow.",
@ -116,14 +107,6 @@
"type": "string",
"default": "LR"
},
"pythonOperatorLocation": {
"description": "File system directory path where managed python operator files are stored.",
"type": "string"
},
"slaMissCallback": {
"description": "python method call back on SLA miss.",
"type": "string"
},
"email": {
"description": "Email to notify workflow status.",
"$ref": "../../../type/basic.json#/definitions/email"

View File

@ -12,8 +12,8 @@
Metadata DAG common functions
"""
import json
from datetime import datetime
from typing import Any, Dict
from datetime import datetime, timedelta
from typing import Any, Dict, Optional
from airflow import DAG
@ -55,14 +55,16 @@ def metadata_ingestion_workflow(workflow_config: OpenMetadataWorkflowConfig):
workflow.stop()
def get_start_date(ingestion_pipeline: IngestionPipeline) -> datetime:
def date_to_datetime(
date: Optional[basic.Date], date_format: str = "%Y-%m-%d"
) -> Optional[datetime]:
"""
Prepare the DAG start_date based on the incoming
airflowPipeline payload from the OM server
Format a basic.Date to datetime
"""
basic_date: basic.Date = ingestion_pipeline.airflowConfig.startDate
if date is None:
return
return datetime.strptime(str(basic_date.__root__), "%Y-%m-%d")
return datetime.strptime(str(date.__root__), date_format)
def build_default_args() -> Dict[str, Any]:
@ -78,6 +80,32 @@ def build_default_args() -> Dict[str, Any]:
}
def build_dag_configs(ingestion_pipeline: IngestionPipeline) -> dict:
"""
Prepare kwargs to send to DAG
:param ingestion_pipeline: pipeline configs
:return: dict to use as kwargs
"""
return {
"dag_id": ingestion_pipeline.name.__root__,
"description": ingestion_pipeline.description,
"default_args": build_default_args(),
"start_date": date_to_datetime(ingestion_pipeline.airflowConfig.startDate),
"end_date": date_to_datetime(ingestion_pipeline.airflowConfig.endDate),
"concurrency": ingestion_pipeline.airflowConfig.concurrency,
"max_active_runs": ingestion_pipeline.airflowConfig.maxActiveRuns,
"default_view": ingestion_pipeline.airflowConfig.workflowDefaultView,
"orientation": ingestion_pipeline.airflowConfig.workflowDefaultViewOrientation,
"dagrun_timeout": timedelta(ingestion_pipeline.airflowConfig.workflowTimeout)
if ingestion_pipeline.airflowConfig.workflowTimeout
else None,
"is_paused_upon_creation": ingestion_pipeline.airflowConfig.pausePipeline
or False,
"catchup": ingestion_pipeline.airflowConfig.pipelineCatchup or False,
"schedule_interval": ingestion_pipeline.airflowConfig.scheduleInterval,
}
def build_ingestion_dag(
task_name: str,
ingestion_pipeline: IngestionPipeline,
@ -87,19 +115,14 @@ def build_ingestion_dag(
Build a simple metadata workflow DAG
"""
with DAG(
dag_id=ingestion_pipeline.name.__root__,
default_args=build_default_args(),
description=ingestion_pipeline.description,
start_date=get_start_date(ingestion_pipeline),
is_paused_upon_creation=ingestion_pipeline.airflowConfig.pausePipeline or False,
catchup=ingestion_pipeline.airflowConfig.pipelineCatchup or False,
) as dag:
with DAG(**build_dag_configs(ingestion_pipeline)) as dag:
PythonOperator(
task_id=task_name,
python_callable=metadata_ingestion_workflow,
op_kwargs={"workflow_config": workflow_config},
retries=ingestion_pipeline.airflowConfig.retries,
retry_delay=ingestion_pipeline.airflowConfig.retryDelay,
)
return dag