From bb4ac68ee354caefb2a3adc777ae2b2368958257 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Fri, 24 Jun 2022 20:01:50 +0200 Subject: [PATCH] Add backend connection for airflow dag extraction --- .../dags/airflow_metadata_extraction.py | 87 +++++++++++++++++++ .../ingestion/source/pipeline/airflow.py | 1 - ingestion/src/metadata/utils/connections.py | 14 +++ 3 files changed, 101 insertions(+), 1 deletion(-) create mode 100644 ingestion/examples/airflow/dags/airflow_metadata_extraction.py diff --git a/ingestion/examples/airflow/dags/airflow_metadata_extraction.py b/ingestion/examples/airflow/dags/airflow_metadata_extraction.py new file mode 100644 index 00000000000..19a2f5dbc3e --- /dev/null +++ b/ingestion/examples/airflow/dags/airflow_metadata_extraction.py @@ -0,0 +1,87 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This DAG can be used directly in your Airflow instance after installing +the `openmetadata-ingestion[airflow-container]` package. Its purpose +is to connect to the underlying database, retrieve the information +and push it to OpenMetadata. +""" +from datetime import timedelta + +import yaml +from airflow import DAG + +try: + from airflow.operators.python import PythonOperator +except ModuleNotFoundError: + from airflow.operators.python_operator import PythonOperator + +from airflow.utils.dates import days_ago + +from metadata.ingestion.api.workflow import Workflow + +default_args = { + "owner": "user_name", + "email": ["username@org.com"], + "email_on_failure": False, + "retries": 3, + "retry_delay": timedelta(minutes=5), + "execution_timeout": timedelta(minutes=60), +} + +config = """ +source: + type: airflow + serviceName: airflow_source + serviceConnection: + config: + type: Airflow + hostPort: http://localhost:8080 + numberOfStatus: 10 + connection: + type: Backend + sourceConfig: + config: + type: PipelineMetadata +sink: + type: metadata-rest + config: {} +workflowConfig: + loggerLevel: INFO + openMetadataServerConfig: + hostPort: http://localhost:8585/api + authProvider: no-auth +""" + + +def metadata_ingestion_workflow(): + workflow_config = yaml.safe_load(config) + workflow = Workflow.create(workflow_config) + workflow.execute() + workflow.raise_from_status() + workflow.print_status() + workflow.stop() + + +with DAG( + "airflow_metadata_extraction", + default_args=default_args, + description="An example DAG which pushes Airflow data to OM", + start_date=days_ago(1), + is_paused_upon_creation=True, + schedule_interval="*/5 * * * *", + catchup=False, +) as dag: + ingest_task = PythonOperator( + task_id="ingest_using_recipe", + python_callable=metadata_ingestion_workflow, + ) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airflow.py b/ingestion/src/metadata/ingestion/source/pipeline/airflow.py index 741102b2cb8..5324aad5c31 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airflow.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airflow.py @@ -154,7 +154,6 @@ class AirflowSource(Source[CreatePipelineRequest]): ) -> OMetaPipelineStatus: dag_run_list = self.get_pipeline_status(serialized_dag.dag_id) for dag in dag_run_list: - tasks = [] if isinstance(dag.task_instances, Iterable): tasks = dag.task_instances else: diff --git a/ingestion/src/metadata/utils/connections.py b/ingestion/src/metadata/utils/connections.py index 409a237419b..a6d0d26283f 100644 --- a/ingestion/src/metadata/utils/connections.py +++ b/ingestion/src/metadata/utils/connections.py @@ -86,6 +86,9 @@ from metadata.generated.schema.entity.services.connections.pipeline.airbyteConne from metadata.generated.schema.entity.services.connections.pipeline.airflowConnection import ( AirflowConnection, ) +from metadata.generated.schema.entity.services.connections.pipeline.backendConnection import ( + BackendConnection, +) from metadata.orm_profiler.orm.functions.conn_test import ConnTestFn from metadata.utils.connection_clients import ( AirByteClient, @@ -697,3 +700,14 @@ def _(connection: ModeClient) -> None: raise SourceConnectionException( f"Unknown error connecting with {connection} - {err}." ) + + +@get_connection.register +def _(_: BackendConnection, verbose: bool = False): + """ + Let's use Airflow's internal connection for this + """ + from airflow import settings + + with settings.Session() as session: + return session.get_bind()