Fix #4491 - PipelineIngestion to support debug (#4564)

This commit is contained in:
Pere Miquel Brull 2022-04-28 12:20:03 +02:00 committed by GitHub
parent e3f216d711
commit a7328ccf1b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 66 additions and 12 deletions

View File

@ -24,6 +24,7 @@ import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.entity.services.ingestionPipelines.AirflowConfig; import org.openmetadata.catalog.entity.services.ingestionPipelines.AirflowConfig;
import org.openmetadata.catalog.entity.services.ingestionPipelines.IngestionPipeline; import org.openmetadata.catalog.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.catalog.entity.services.ingestionPipelines.Source; import org.openmetadata.catalog.entity.services.ingestionPipelines.Source;
import org.openmetadata.catalog.metadataIngestion.LogLevels;
import org.openmetadata.catalog.resources.services.ingestionpipelines.IngestionPipelineResource; import org.openmetadata.catalog.resources.services.ingestionpipelines.IngestionPipelineResource;
import org.openmetadata.catalog.services.connections.metadata.OpenMetadataServerConnection; import org.openmetadata.catalog.services.connections.metadata.OpenMetadataServerConnection;
import org.openmetadata.catalog.type.ChangeDescription; import org.openmetadata.catalog.type.ChangeDescription;
@ -35,8 +36,8 @@ import org.openmetadata.catalog.util.FullyQualifiedName;
import org.openmetadata.catalog.util.JsonUtils; import org.openmetadata.catalog.util.JsonUtils;
public class IngestionPipelineRepository extends EntityRepository<IngestionPipeline> { public class IngestionPipelineRepository extends EntityRepository<IngestionPipeline> {
private static final String INGESTION_PIPELINE_UPDATE_FIELDS = "owner,source,airflowConfig"; private static final String INGESTION_PIPELINE_UPDATE_FIELDS = "owner,source,airflowConfig,loggerLevel";
private static final String INGESTION_PIPELINE_PATCH_FIELDS = "owner,source,airflowConfig"; private static final String INGESTION_PIPELINE_PATCH_FIELDS = "owner,source,airflowConfig,loggerLevel";
public IngestionPipelineRepository(CollectionDAO dao) { public IngestionPipelineRepository(CollectionDAO dao) {
super( super(
@ -252,6 +253,7 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
updateAirflowConfig(origIngestion.getAirflowConfig(), updatedIngestion.getAirflowConfig()); updateAirflowConfig(origIngestion.getAirflowConfig(), updatedIngestion.getAirflowConfig());
updateOpenMetadataServerConnection( updateOpenMetadataServerConnection(
origIngestion.getOpenMetadataServerConnection(), updatedIngestion.getOpenMetadataServerConnection()); origIngestion.getOpenMetadataServerConnection(), updatedIngestion.getOpenMetadataServerConnection());
updateLogLevel(origIngestion.getLoggerLevel(), updatedIngestion.getLoggerLevel());
} }
private void updateSource(Source origSource, Source updatedSource) throws JsonProcessingException { private void updateSource(Source origSource, Source updatedSource) throws JsonProcessingException {
@ -282,5 +284,11 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
recordChange("openMetadataServerConnection", origConfig, updatedConfig); recordChange("openMetadataServerConnection", origConfig, updatedConfig);
} }
} }
private void updateLogLevel(LogLevels origLevel, LogLevels updatedLevel) throws JsonProcessingException {
if (updatedLevel != null && !origLevel.equals(updatedLevel)) {
recordChange("loggerLevel", origLevel, updatedLevel);
}
}
} }
} }

View File

@ -454,6 +454,7 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
.withAirflowConfig(create.getAirflowConfig()) .withAirflowConfig(create.getAirflowConfig())
.withOpenMetadataServerConnection(openMetadataServerConnection) .withOpenMetadataServerConnection(openMetadataServerConnection)
.withSource(source) .withSource(source)
.withLoggerLevel(create.getLoggerLevel())
.withOwner(create.getOwner()) .withOwner(create.getOwner())
.withService(create.getService()) .withService(create.getService())
.withUpdatedBy(securityContext.getUserPrincipal().getName()) .withUpdatedBy(securityContext.getUserPrincipal().getName())

View File

@ -26,6 +26,11 @@
"airflowConfig": { "airflowConfig": {
"$ref": "../../../entity/services/ingestionPipelines/ingestionPipeline.json#/definitions/airflowConfig" "$ref": "../../../entity/services/ingestionPipelines/ingestionPipeline.json#/definitions/airflowConfig"
}, },
"loggerLevel": {
"description": "Set the logging level for the workflow.",
"$ref": "../../../metadataIngestion/workflow.json#/definitions/logLevels",
"default": "INFO"
},
"service": { "service": {
"description": "Link to the database service where this database is hosted in.", "description": "Link to the database service where this database is hosted in.",
"$ref": "../../../type/entityReference.json" "$ref": "../../../type/entityReference.json"

View File

@ -166,6 +166,11 @@
}, },
"default": null "default": null
}, },
"loggerLevel": {
"description": "Set the logging level for the workflow.",
"$ref": "../../../metadataIngestion/workflow.json#/definitions/logLevels",
"default": "INFO"
},
"deployed": { "deployed": {
"description": "Indicates if the workflow has been successfully deployed to Airflow.", "description": "Indicates if the workflow has been successfully deployed to Airflow.",
"type": "boolean" "type": "boolean"

View File

@ -122,13 +122,24 @@
"additionalProperties": false, "additionalProperties": false,
"required": ["type"] "required": ["type"]
}, },
"logLevels": {
"description": "Supported logging levels",
"javaType": "org.openmetadata.catalog.metadataIngestion.LogLevels",
"type": "string",
"enum": [
"DEBUG",
"INFO",
"WARN",
"ERROR"
],
"default": "INFO"
},
"workflowConfig": { "workflowConfig": {
"description": "Configuration for the entire Ingestion Workflow.", "description": "Configuration for the entire Ingestion Workflow.",
"type": "object", "type": "object",
"properties": { "properties": {
"loggerLevel": { "loggerLevel": {
"type": "string", "$ref": "#/definitions/logLevels",
"enum": ["DEBUG", "INFO", "WARN"],
"default": "INFO" "default": "INFO"
}, },
"openMetadataServerConfig": { "openMetadataServerConfig": {

View File

@ -56,6 +56,7 @@ import org.openmetadata.catalog.metadataIngestion.DashboardServiceMetadataPipeli
import org.openmetadata.catalog.metadataIngestion.DatabaseServiceMetadataPipeline; import org.openmetadata.catalog.metadataIngestion.DatabaseServiceMetadataPipeline;
import org.openmetadata.catalog.metadataIngestion.DatabaseServiceQueryUsagePipeline; import org.openmetadata.catalog.metadataIngestion.DatabaseServiceQueryUsagePipeline;
import org.openmetadata.catalog.metadataIngestion.FilterPattern; import org.openmetadata.catalog.metadataIngestion.FilterPattern;
import org.openmetadata.catalog.metadataIngestion.LogLevels;
import org.openmetadata.catalog.metadataIngestion.MessagingServiceMetadataPipeline; import org.openmetadata.catalog.metadataIngestion.MessagingServiceMetadataPipeline;
import org.openmetadata.catalog.metadataIngestion.SourceConfig; import org.openmetadata.catalog.metadataIngestion.SourceConfig;
import org.openmetadata.catalog.resources.EntityResourceTest; import org.openmetadata.catalog.resources.EntityResourceTest;
@ -250,6 +251,7 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
assertEquals(pipelineConcurrency, ingestion.getAirflowConfig().getConcurrency()); assertEquals(pipelineConcurrency, ingestion.getAirflowConfig().getConcurrency());
assertEquals(expectedFQN, ingestion.getFullyQualifiedName()); assertEquals(expectedFQN, ingestion.getFullyQualifiedName());
assertEquals(expectedScheduleInterval, ingestion.getAirflowConfig().getScheduleInterval()); assertEquals(expectedScheduleInterval, ingestion.getAirflowConfig().getScheduleInterval());
assertEquals(LogLevels.INFO, ingestion.getLoggerLevel());
ingestion = getEntity(ingestion.getId(), FIELD_OWNER, ADMIN_AUTH_HEADERS); ingestion = getEntity(ingestion.getId(), FIELD_OWNER, ADMIN_AUTH_HEADERS);
assertEquals(expectedScheduleInterval, ingestion.getAirflowConfig().getScheduleInterval()); assertEquals(expectedScheduleInterval, ingestion.getAirflowConfig().getScheduleInterval());
} }
@ -334,6 +336,7 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
updateIngestionPipeline( updateIngestionPipeline(
request request
.withSourceConfig(updatedSourceConfig) .withSourceConfig(updatedSourceConfig)
.withLoggerLevel(LogLevels.ERROR)
.withAirflowConfig( .withAirflowConfig(
new AirflowConfig() new AirflowConfig()
.withConcurrency(pipelineConcurrency) .withConcurrency(pipelineConcurrency)
@ -345,6 +348,9 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
assertEquals(pipelineConcurrency, ingestion.getAirflowConfig().getConcurrency()); assertEquals(pipelineConcurrency, ingestion.getAirflowConfig().getConcurrency());
assertEquals(expectedFQN, ingestion.getFullyQualifiedName()); assertEquals(expectedFQN, ingestion.getFullyQualifiedName());
assertEquals(expectedScheduleInterval, ingestion.getAirflowConfig().getScheduleInterval()); assertEquals(expectedScheduleInterval, ingestion.getAirflowConfig().getScheduleInterval());
assertEquals(LogLevels.ERROR, updatedIngestion.getLoggerLevel());
validateSourceConfig(updatedSourceConfig, updatedIngestion.getSource().getSourceConfig(), ingestion); validateSourceConfig(updatedSourceConfig, updatedIngestion.getSource().getSourceConfig(), ingestion);
} }

View File

@ -12,6 +12,7 @@
Metadata DAG common functions Metadata DAG common functions
""" """
import json import json
import logging
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Any, Callable, Dict, Optional from typing import Any, Callable, Dict, Optional
@ -35,6 +36,7 @@ from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipel
) )
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig, OpenMetadataWorkflowConfig,
WorkflowConfig,
) )
from metadata.ingestion.api.workflow import Workflow from metadata.ingestion.api.workflow import Workflow
@ -48,6 +50,9 @@ def metadata_ingestion_workflow(workflow_config: OpenMetadataWorkflowConfig):
This is the callable used to create the PythonOperator This is the callable used to create the PythonOperator
""" """
logging.getLogger().setLevel(workflow_config.workflowConfig.loggerLevel.value)
config = json.loads(workflow_config.json(encoder=show_secrets_encoder)) config = json.loads(workflow_config.json(encoder=show_secrets_encoder))
workflow = Workflow.create(config) workflow = Workflow.create(config)
@ -66,6 +71,9 @@ def profiler_workflow(workflow_config: OpenMetadataWorkflowConfig):
This is the callable used to create the PythonOperator This is the callable used to create the PythonOperator
""" """
logging.getLogger().setLevel(workflow_config.workflowConfig.loggerLevel.value)
config = json.loads(workflow_config.json(encoder=show_secrets_encoder)) config = json.loads(workflow_config.json(encoder=show_secrets_encoder))
workflow = ProfilerWorkflow.create(config) workflow = ProfilerWorkflow.create(config)
@ -87,6 +95,20 @@ def date_to_datetime(
return datetime.strptime(str(date.__root__), date_format) return datetime.strptime(str(date.__root__), date_format)
def build_workflow_config_property(
ingestion_pipeline: IngestionPipeline,
) -> WorkflowConfig:
"""
Prepare the workflow config with logLevels and openMetadataServerConfig
:param ingestion_pipeline: Received payload from REST
:return: WorkflowConfig
"""
return WorkflowConfig(
loggerLevel=ingestion_pipeline.loggerLevel,
openMetadataServerConfig=ingestion_pipeline.openMetadataServerConnection,
)
def build_default_args() -> Dict[str, Any]: def build_default_args() -> Dict[str, Any]:
""" """
Build the default_args dict to be passed Build the default_args dict to be passed

View File

@ -15,6 +15,7 @@ Metadata DAG function builder
from airflow import DAG from airflow import DAG
from openmetadata.workflows.ingestion.common import ( from openmetadata.workflows.ingestion.common import (
build_dag, build_dag,
build_workflow_config_property,
metadata_ingestion_workflow, metadata_ingestion_workflow,
) )
@ -29,7 +30,6 @@ from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipel
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig, OpenMetadataWorkflowConfig,
Sink, Sink,
WorkflowConfig,
) )
@ -45,9 +45,7 @@ def build_metadata_workflow_config(
type="metadata-rest", type="metadata-rest",
config={}, config={},
), ),
workflowConfig=WorkflowConfig( workflowConfig=build_workflow_config_property(ingestion_pipeline),
openMetadataServerConfig=ingestion_pipeline.openMetadataServerConnection
),
) )
return workflow_config return workflow_config

View File

@ -16,6 +16,7 @@ Metadata DAG function builder
from airflow import DAG from airflow import DAG
from openmetadata.workflows.ingestion.common import ( from openmetadata.workflows.ingestion.common import (
build_dag, build_dag,
build_workflow_config_property,
metadata_ingestion_workflow, metadata_ingestion_workflow,
) )
@ -24,7 +25,6 @@ from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig, OpenMetadataWorkflowConfig,
Processor, Processor,
Stage, Stage,
WorkflowConfig,
) )
try: try:
@ -58,9 +58,7 @@ def build_usage_workflow_config(
bulkSink=BulkSink( bulkSink=BulkSink(
type="metadata-usage", config={"filename": tmp_file.name} type="metadata-usage", config={"filename": tmp_file.name}
), ),
workflowConfig=WorkflowConfig( workflowConfig=build_workflow_config_property(ingestion_pipeline),
openMetadataServerConfig=ingestion_pipeline.openMetadataServerConnection
),
) )
return workflow_config return workflow_config