From ed698dc40f335c62692c13b5ff5292035341ab93 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Fri, 10 Sep 2021 22:28:23 -0700 Subject: [PATCH] Fix #466: Fix airflow example dag (#467) --- .../airflow/sample_tables_airflow_example.py | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/ingestion/examples/airflow/sample_tables_airflow_example.py b/ingestion/examples/airflow/sample_tables_airflow_example.py index e3ab749a847..c735d949ffb 100644 --- a/ingestion/examples/airflow/sample_tables_airflow_example.py +++ b/ingestion/examples/airflow/sample_tables_airflow_example.py @@ -12,7 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import pathlib from datetime import timedelta from airflow import DAG @@ -23,28 +23,32 @@ except ModuleNotFoundError: from metadata.config.common import load_config_file from metadata.ingestion.api.workflow import Workflow +from airflow.utils.dates import days_ago default_args = { "owner": "user_name", "email": ["username@org.com"], - "email_on_failure": True, + "email_on_failure": False, "retries": 3, "retry_delay": timedelta(minutes=5), - "execution_timeout": timedelta(minutes=60), + "execution_timeout": timedelta(minutes=60) } def metadata_ingestion_workflow(): - config = load_config_file("examples/workflows/hive.json") - workflow = Workflow.create(config) - workflow.run() + config_file = pathlib.Path("/tmp/sample_data.json") + workflow_config = load_config_file(config_file) + if workflow_config.get('cron'): + del workflow_config['cron'] + workflow = Workflow.create(workflow_config) + workflow.execute() workflow.raise_from_status() workflow.print_status() workflow.stop() with DAG( - "hive_metadata_ingestion_workflow" + "sample_metadata_ingestion_workflow", default_args=default_args, description="An example DAG which runs a OpenMetadata ingestion workflow", schedule_interval=timedelta(days=1), @@ -53,5 +57,5 @@ with DAG( ) as dag: ingest_task = PythonOperator( task_id="ingest_using_recipe", - python_callable=metadata_ingestion_workflow(), + python_callable=metadata_ingestion_workflow, ) \ No newline at end of file