diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/IngestionPipelineRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/IngestionPipelineRepository.java index f615d362536..93723eee2dd 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/IngestionPipelineRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/IngestionPipelineRepository.java @@ -24,6 +24,7 @@ import org.openmetadata.catalog.Entity; import org.openmetadata.catalog.entity.services.ingestionPipelines.AirflowConfig; import org.openmetadata.catalog.entity.services.ingestionPipelines.IngestionPipeline; import org.openmetadata.catalog.entity.services.ingestionPipelines.Source; +import org.openmetadata.catalog.metadataIngestion.LogLevels; import org.openmetadata.catalog.resources.services.ingestionpipelines.IngestionPipelineResource; import org.openmetadata.catalog.services.connections.metadata.OpenMetadataServerConnection; import org.openmetadata.catalog.type.ChangeDescription; @@ -35,8 +36,8 @@ import org.openmetadata.catalog.util.FullyQualifiedName; import org.openmetadata.catalog.util.JsonUtils; public class IngestionPipelineRepository extends EntityRepository { - private static final String INGESTION_PIPELINE_UPDATE_FIELDS = "owner,source,airflowConfig"; - private static final String INGESTION_PIPELINE_PATCH_FIELDS = "owner,source,airflowConfig"; + private static final String INGESTION_PIPELINE_UPDATE_FIELDS = "owner,source,airflowConfig,loggerLevel"; + private static final String INGESTION_PIPELINE_PATCH_FIELDS = "owner,source,airflowConfig,loggerLevel"; public IngestionPipelineRepository(CollectionDAO dao) { super( @@ -252,6 +253,7 @@ public class IngestionPipelineRepository extends EntityRepository WorkflowConfig: + """ + Prepare the workflow config with logLevels and openMetadataServerConfig + :param ingestion_pipeline: Received payload from REST + :return: WorkflowConfig + """ + return WorkflowConfig( + loggerLevel=ingestion_pipeline.loggerLevel, + openMetadataServerConfig=ingestion_pipeline.openMetadataServerConnection, + ) + + def build_default_args() -> Dict[str, Any]: """ Build the default_args dict to be passed diff --git a/openmetadata-airflow-apis/src/openmetadata/workflows/ingestion/metadata.py b/openmetadata-airflow-apis/src/openmetadata/workflows/ingestion/metadata.py index d97242dc741..5758fa89333 100644 --- a/openmetadata-airflow-apis/src/openmetadata/workflows/ingestion/metadata.py +++ b/openmetadata-airflow-apis/src/openmetadata/workflows/ingestion/metadata.py @@ -15,6 +15,7 @@ Metadata DAG function builder from airflow import DAG from openmetadata.workflows.ingestion.common import ( build_dag, + build_workflow_config_property, metadata_ingestion_workflow, ) @@ -29,7 +30,6 @@ from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipel from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataWorkflowConfig, Sink, - WorkflowConfig, ) @@ -45,9 +45,7 @@ def build_metadata_workflow_config( type="metadata-rest", config={}, ), - workflowConfig=WorkflowConfig( - openMetadataServerConfig=ingestion_pipeline.openMetadataServerConnection - ), + workflowConfig=build_workflow_config_property(ingestion_pipeline), ) return workflow_config diff --git a/openmetadata-airflow-apis/src/openmetadata/workflows/ingestion/usage.py b/openmetadata-airflow-apis/src/openmetadata/workflows/ingestion/usage.py index 09830e41735..255ca9337f7 100644 --- a/openmetadata-airflow-apis/src/openmetadata/workflows/ingestion/usage.py +++ b/openmetadata-airflow-apis/src/openmetadata/workflows/ingestion/usage.py @@ -16,6 +16,7 @@ Metadata DAG function builder from airflow import DAG from openmetadata.workflows.ingestion.common import ( build_dag, + build_workflow_config_property, metadata_ingestion_workflow, ) @@ -24,7 +25,6 @@ from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataWorkflowConfig, Processor, Stage, - WorkflowConfig, ) try: @@ -58,9 +58,7 @@ def build_usage_workflow_config( bulkSink=BulkSink( type="metadata-usage", config={"filename": tmp_file.name} ), - workflowConfig=WorkflowConfig( - openMetadataServerConfig=ingestion_pipeline.openMetadataServerConnection - ), + workflowConfig=build_workflow_config_property(ingestion_pipeline), ) return workflow_config