From e876d01841d19ce86212cbfaf46d374f63c88a3f Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Wed, 20 Apr 2022 14:10:40 +0200 Subject: [PATCH] prepare usage config (#4279) --- .../openmetadata/workflows/ingestion/usage.py | 32 +++++++++++++++++-- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/openmetadata-airflow-apis/src/openmetadata/workflows/ingestion/usage.py b/openmetadata-airflow-apis/src/openmetadata/workflows/ingestion/usage.py index f26e9bbccfd..e8d4b79356f 100644 --- a/openmetadata-airflow-apis/src/openmetadata/workflows/ingestion/usage.py +++ b/openmetadata-airflow-apis/src/openmetadata/workflows/ingestion/usage.py @@ -12,26 +12,52 @@ Metadata DAG function builder """ -from typing import Any, Dict from airflow import DAG from openmetadata.workflows.ingestion.common import build_ingestion_dag +from metadata.generated.schema.metadataIngestion.workflow import ( + BulkSink, + OpenMetadataWorkflowConfig, + Processor, + Stage, + WorkflowConfig, +) + try: from airflow.operators.python import PythonOperator except ModuleNotFoundError: from airflow.operators.python_operator import PythonOperator +import tempfile + from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( IngestionPipeline, ) -def build_usage_workflow_config(airflow_pipeline: IngestionPipeline) -> Dict[str, Any]: +def build_usage_workflow_config( + ingestion_pipeline: IngestionPipeline, +) -> OpenMetadataWorkflowConfig: """ Given an airflow_pipeline, prepare the workflow config JSON """ - ... + + with tempfile.NamedTemporaryFile() as tmp_file: + + workflow_config = OpenMetadataWorkflowConfig( + source=ingestion_pipeline.source, + processor=Processor(type="query-parser", config={"filter": ""}), + stage=Stage(type="table-usage", config={"filename": tmp_file.name}), + bulkSink=BulkSink( + type="metadata-usage", config={"filename": tmp_file.name} + ), + workflowConfig=WorkflowConfig( + openMetadataServerConfig=ingestion_pipeline.openMetadataServerConnection + ), + ) + + return workflow_config def build_usage_dag(airflow_pipeline: IngestionPipeline) -> DAG: