mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-11-01 11:09:14 +00:00
prepare usage config (#4279)
This commit is contained in:
parent
596f3ed9ca
commit
e876d01841
@ -12,26 +12,52 @@
|
||||
Metadata DAG function builder
|
||||
"""
|
||||
|
||||
from typing import Any, Dict
|
||||
|
||||
from airflow import DAG
|
||||
from openmetadata.workflows.ingestion.common import build_ingestion_dag
|
||||
|
||||
from metadata.generated.schema.metadataIngestion.workflow import (
|
||||
BulkSink,
|
||||
OpenMetadataWorkflowConfig,
|
||||
Processor,
|
||||
Stage,
|
||||
WorkflowConfig,
|
||||
)
|
||||
|
||||
try:
|
||||
from airflow.operators.python import PythonOperator
|
||||
except ModuleNotFoundError:
|
||||
from airflow.operators.python_operator import PythonOperator
|
||||
|
||||
import tempfile
|
||||
|
||||
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
|
||||
IngestionPipeline,
|
||||
)
|
||||
|
||||
|
||||
def build_usage_workflow_config(airflow_pipeline: IngestionPipeline) -> Dict[str, Any]:
|
||||
def build_usage_workflow_config(
|
||||
ingestion_pipeline: IngestionPipeline,
|
||||
) -> OpenMetadataWorkflowConfig:
|
||||
"""
|
||||
Given an airflow_pipeline, prepare the workflow config JSON
|
||||
"""
|
||||
...
|
||||
|
||||
with tempfile.NamedTemporaryFile() as tmp_file:
|
||||
|
||||
workflow_config = OpenMetadataWorkflowConfig(
|
||||
source=ingestion_pipeline.source,
|
||||
processor=Processor(type="query-parser", config={"filter": ""}),
|
||||
stage=Stage(type="table-usage", config={"filename": tmp_file.name}),
|
||||
bulkSink=BulkSink(
|
||||
type="metadata-usage", config={"filename": tmp_file.name}
|
||||
),
|
||||
workflowConfig=WorkflowConfig(
|
||||
openMetadataServerConfig=ingestion_pipeline.openMetadataServerConnection
|
||||
),
|
||||
)
|
||||
|
||||
return workflow_config
|
||||
|
||||
|
||||
def build_usage_dag(airflow_pipeline: IngestionPipeline) -> DAG:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user