From a7328ccf1b9e38be2b1e01cb142e6c3311886c34 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Thu, 28 Apr 2022 12:20:03 +0200 Subject: [PATCH] Fix #4491 - PipelineIngestion to support debug (#4564) --- .../jdbi3/IngestionPipelineRepository.java | 12 ++++++++-- .../IngestionPipelineResource.java | 1 + .../createIngestionPipeline.json | 5 +++++ .../ingestionPipelines/ingestionPipeline.json | 5 +++++ .../schema/metadataIngestion/workflow.json | 15 +++++++++++-- .../IngestionPipelineResourceTest.java | 6 +++++ .../workflows/ingestion/common.py | 22 +++++++++++++++++++ .../workflows/ingestion/metadata.py | 6 ++--- .../openmetadata/workflows/ingestion/usage.py | 6 ++--- 9 files changed, 66 insertions(+), 12 deletions(-) diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/IngestionPipelineRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/IngestionPipelineRepository.java index f615d362536..93723eee2dd 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/IngestionPipelineRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/IngestionPipelineRepository.java @@ -24,6 +24,7 @@ import org.openmetadata.catalog.Entity; import org.openmetadata.catalog.entity.services.ingestionPipelines.AirflowConfig; import org.openmetadata.catalog.entity.services.ingestionPipelines.IngestionPipeline; import org.openmetadata.catalog.entity.services.ingestionPipelines.Source; +import org.openmetadata.catalog.metadataIngestion.LogLevels; import org.openmetadata.catalog.resources.services.ingestionpipelines.IngestionPipelineResource; import org.openmetadata.catalog.services.connections.metadata.OpenMetadataServerConnection; import org.openmetadata.catalog.type.ChangeDescription; @@ -35,8 +36,8 @@ import org.openmetadata.catalog.util.FullyQualifiedName; import org.openmetadata.catalog.util.JsonUtils; public class IngestionPipelineRepository extends EntityRepository { - private static final String INGESTION_PIPELINE_UPDATE_FIELDS = "owner,source,airflowConfig"; - private static final String INGESTION_PIPELINE_PATCH_FIELDS = "owner,source,airflowConfig"; + private static final String INGESTION_PIPELINE_UPDATE_FIELDS = "owner,source,airflowConfig,loggerLevel"; + private static final String INGESTION_PIPELINE_PATCH_FIELDS = "owner,source,airflowConfig,loggerLevel"; public IngestionPipelineRepository(CollectionDAO dao) { super( @@ -252,6 +253,7 @@ public class IngestionPipelineRepository extends EntityRepository WorkflowConfig: + """ + Prepare the workflow config with logLevels and openMetadataServerConfig + :param ingestion_pipeline: Received payload from REST + :return: WorkflowConfig + """ + return WorkflowConfig( + loggerLevel=ingestion_pipeline.loggerLevel, + openMetadataServerConfig=ingestion_pipeline.openMetadataServerConnection, + ) + + def build_default_args() -> Dict[str, Any]: """ Build the default_args dict to be passed diff --git a/openmetadata-airflow-apis/src/openmetadata/workflows/ingestion/metadata.py b/openmetadata-airflow-apis/src/openmetadata/workflows/ingestion/metadata.py index d97242dc741..5758fa89333 100644 --- a/openmetadata-airflow-apis/src/openmetadata/workflows/ingestion/metadata.py +++ b/openmetadata-airflow-apis/src/openmetadata/workflows/ingestion/metadata.py @@ -15,6 +15,7 @@ Metadata DAG function builder from airflow import DAG from openmetadata.workflows.ingestion.common import ( build_dag, + build_workflow_config_property, metadata_ingestion_workflow, ) @@ -29,7 +30,6 @@ from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipel from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataWorkflowConfig, Sink, - WorkflowConfig, ) @@ -45,9 +45,7 @@ def build_metadata_workflow_config( type="metadata-rest", config={}, ), - workflowConfig=WorkflowConfig( - openMetadataServerConfig=ingestion_pipeline.openMetadataServerConnection - ), + workflowConfig=build_workflow_config_property(ingestion_pipeline), ) return workflow_config diff --git a/openmetadata-airflow-apis/src/openmetadata/workflows/ingestion/usage.py b/openmetadata-airflow-apis/src/openmetadata/workflows/ingestion/usage.py index 09830e41735..255ca9337f7 100644 --- a/openmetadata-airflow-apis/src/openmetadata/workflows/ingestion/usage.py +++ b/openmetadata-airflow-apis/src/openmetadata/workflows/ingestion/usage.py @@ -16,6 +16,7 @@ Metadata DAG function builder from airflow import DAG from openmetadata.workflows.ingestion.common import ( build_dag, + build_workflow_config_property, metadata_ingestion_workflow, ) @@ -24,7 +25,6 @@ from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataWorkflowConfig, Processor, Stage, - WorkflowConfig, ) try: @@ -58,9 +58,7 @@ def build_usage_workflow_config( bulkSink=BulkSink( type="metadata-usage", config={"filename": tmp_file.name} ), - workflowConfig=WorkflowConfig( - openMetadataServerConfig=ingestion_pipeline.openMetadataServerConnection - ), + workflowConfig=build_workflow_config_property(ingestion_pipeline), ) return workflow_config