Fix #4889 - IngestionPipeline reads source connection dynamically; Fix #4888 - Remove force deploy (#5228)

This commit is contained in:
Pere Miquel Brull 2022-06-02 20:40:53 +02:00 committed by GitHub
parent c2e83923e2
commit a57806553a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 92 additions and 116 deletions

View File

@ -21,8 +21,8 @@ import org.json.JSONObject;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.entity.services.ingestionPipelines.AirflowConfig;
import org.openmetadata.catalog.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.catalog.entity.services.ingestionPipelines.Source;
import org.openmetadata.catalog.metadataIngestion.LogLevels;
import org.openmetadata.catalog.metadataIngestion.SourceConfig;
import org.openmetadata.catalog.resources.services.ingestionpipelines.IngestionPipelineResource;
import org.openmetadata.catalog.services.connections.metadata.OpenMetadataServerConnection;
import org.openmetadata.catalog.type.EntityReference;
@ -33,8 +33,8 @@ import org.openmetadata.catalog.util.JsonUtils;
import org.openmetadata.catalog.util.PipelineServiceClient;
public class IngestionPipelineRepository extends EntityRepository<IngestionPipeline> {
private static final String INGESTION_PIPELINE_UPDATE_FIELDS = "owner,source,airflowConfig,loggerLevel";
private static final String INGESTION_PIPELINE_PATCH_FIELDS = "owner,source,airflowConfig,loggerLevel";
private static final String INGESTION_PIPELINE_UPDATE_FIELDS = "owner,sourceConfig,airflowConfig,loggerLevel";
private static final String INGESTION_PIPELINE_PATCH_FIELDS = "owner,sourceConfig,airflowConfig,loggerLevel";
private static PipelineServiceClient pipelineServiceClient;
public IngestionPipelineRepository(CollectionDAO dao) {
@ -107,16 +107,6 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
return getContainer(ingestionPipeline.getId(), Entity.INGESTION_PIPELINE);
}
@Override
protected void postUpdate(IngestionPipeline entity) {
deploy(entity); // Deploy the ingestion pipeline
}
@Override
protected void postCreate(IngestionPipeline entity) {
deploy(entity); // Deploy the ingestion pipeline
}
@Override
protected void postDelete(IngestionPipeline entity) {
pipelineServiceClient.deletePipeline(entity.getName());
@ -126,12 +116,6 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
pipelineServiceClient = client;
}
private void deploy(IngestionPipeline ingestionPipeline) {
if (Boolean.TRUE.equals(ingestionPipeline.getAirflowConfig().getForceDeploy())) {
pipelineServiceClient.deployPipeline(ingestionPipeline);
}
}
/** Handles entity updated from PUT and POST operation. */
public class IngestionPipelineUpdater extends EntityUpdater {
public IngestionPipelineUpdater(IngestionPipeline original, IngestionPipeline updated, Operation operation) {
@ -140,24 +124,20 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
@Override
public void entitySpecificUpdate() throws IOException {
updateSource(original.getSource(), updated.getSource());
updateSourceConfig(original.getSourceConfig(), updated.getSourceConfig());
updateAirflowConfig(original.getAirflowConfig(), updated.getAirflowConfig());
updateOpenMetadataServerConnection(
original.getOpenMetadataServerConnection(), updated.getOpenMetadataServerConnection());
updateLogLevel(original.getLoggerLevel(), updated.getLoggerLevel());
}
private void updateSource(Source origSource, Source updatedSource) throws JsonProcessingException {
JSONObject origSourceConfig = new JSONObject(JsonUtils.pojoToJson(origSource.getSourceConfig().getConfig()));
JSONObject updatedSourceConfig =
new JSONObject(JsonUtils.pojoToJson(updatedSource.getSourceConfig().getConfig()));
JSONObject origSourceConnection = new JSONObject(JsonUtils.pojoToJson(origSource.getServiceConnection()));
JSONObject updatedSourceConnection = new JSONObject(JsonUtils.pojoToJson(updatedSource.getServiceConnection()));
private void updateSourceConfig(SourceConfig origSource, SourceConfig updatedSource)
throws JsonProcessingException {
JSONObject origSourceConfig = new JSONObject(JsonUtils.pojoToJson(origSource.getConfig()));
JSONObject updatedSourceConfig = new JSONObject(JsonUtils.pojoToJson(updatedSource.getConfig()));
if (!origSource.getServiceName().equals(updatedSource.getServiceName())
|| !origSourceConfig.similar(updatedSourceConfig)
|| !origSourceConnection.similar(updatedSourceConnection)) {
recordChange("source", origSource, updatedSource);
if (!origSourceConfig.similar(updatedSourceConfig)) {
recordChange("sourceConfig", origSource, updatedSource);
}
}

View File

@ -32,7 +32,6 @@ import io.swagger.v3.oas.annotations.responses.ApiResponse;
import java.io.IOException;
import java.net.http.HttpResponse;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import javax.json.JsonPatch;
import javax.validation.Valid;
@ -61,12 +60,7 @@ import org.openmetadata.catalog.airflow.AirflowConfiguration;
import org.openmetadata.catalog.airflow.AirflowRESTClient;
import org.openmetadata.catalog.api.services.ingestionPipelines.CreateIngestionPipeline;
import org.openmetadata.catalog.api.services.ingestionPipelines.TestServiceConnection;
import org.openmetadata.catalog.entity.services.DashboardService;
import org.openmetadata.catalog.entity.services.DatabaseService;
import org.openmetadata.catalog.entity.services.MessagingService;
import org.openmetadata.catalog.entity.services.PipelineService;
import org.openmetadata.catalog.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.catalog.entity.services.ingestionPipelines.Source;
import org.openmetadata.catalog.jdbi3.CollectionDAO;
import org.openmetadata.catalog.jdbi3.IngestionPipelineRepository;
import org.openmetadata.catalog.jdbi3.ListFilter;
@ -507,63 +501,17 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
}
private IngestionPipeline getIngestionPipeline(CreateIngestionPipeline create, String user) throws IOException {
Source source = buildIngestionSource(create);
OpenMetadataServerConnection openMetadataServerConnection =
OpenMetadataClientSecurityUtil.buildOpenMetadataServerConfig(airflowConfiguration);
return copy(new IngestionPipeline(), create, user)
.withPipelineType(create.getPipelineType())
.withAirflowConfig(create.getAirflowConfig())
.withOpenMetadataServerConnection(openMetadataServerConnection)
.withSource(source)
.withSourceConfig(create.getSourceConfig())
.withLoggerLevel(create.getLoggerLevel())
.withService(create.getService());
}
private Source buildIngestionSource(CreateIngestionPipeline create) throws IOException {
Source source;
String serviceType = create.getService().getType();
Fields serviceFields = new Fields(List.of("connection"));
switch (serviceType) {
case Entity.DATABASE_SERVICE:
DatabaseService databaseService = Entity.getEntity(create.getService(), serviceFields, Include.ALL);
source =
new Source()
.withServiceConnection(databaseService.getConnection())
.withServiceName(databaseService.getName())
.withType(databaseService.getServiceType().value().toLowerCase(Locale.ROOT));
break;
case Entity.DASHBOARD_SERVICE:
DashboardService dashboardService = Entity.getEntity(create.getService(), serviceFields, Include.ALL);
source =
new Source()
.withServiceName(dashboardService.getName())
.withServiceConnection(dashboardService.getConnection())
.withType(dashboardService.getServiceType().value().toLowerCase(Locale.ROOT));
break;
case Entity.MESSAGING_SERVICE:
MessagingService messagingService = Entity.getEntity(create.getService(), serviceFields, Include.ALL);
source =
new Source()
.withServiceName(messagingService.getName())
.withServiceConnection(messagingService.getConnection())
.withType(messagingService.getServiceType().value().toLowerCase(Locale.ROOT));
break;
case Entity.PIPELINE_SERVICE:
PipelineService pipelineService = Entity.getEntity(create.getService(), serviceFields, Include.ALL);
source =
new Source()
.withServiceName(pipelineService.getName())
.withServiceConnection(pipelineService.getConnection())
.withType(pipelineService.getServiceType().value().toLowerCase(Locale.ROOT));
break;
default:
throw new IllegalArgumentException(
String.format("Ingestion Pipeline is not supported for service %s", serviceType));
}
source.setSourceConfig(create.getSourceConfig());
return source;
}
public void addStatus(List<IngestionPipeline> ingestionPipelines) {
listOrEmpty(ingestionPipelines).forEach(this::addStatus);
}

View File

@ -42,11 +42,6 @@
"type": "object",
"javaType": "org.openmetadata.catalog.entity.services.ingestionPipelines.AirflowConfig",
"properties": {
"forceDeploy": {
"description": "Deploy the pipeline by overwriting existing pipeline with the same name.",
"type": "boolean",
"default": false
},
"pausePipeline": {
"description": "pause the pipeline from running once the deploy is finished successfully.",
"type": "boolean",
@ -147,8 +142,8 @@
"description": "Name that uniquely identifies a Pipeline.",
"$ref": "../../../type/basic.json#/definitions/fullyQualifiedEntityName"
},
"source": {
"$ref": "../../../metadataIngestion/workflow.json#/definitions/source"
"sourceConfig": {
"$ref": "../../../metadataIngestion/workflow.json#/definitions/sourceConfig"
},
"openMetadataServerConnection": {
"$ref": "../connections/metadata/openMetadataConnection.json"
@ -205,7 +200,7 @@
"required": [
"name",
"pipelineType",
"source",
"sourceConfig",
"openMetadataServerConnection",
"airflowConfig"
],

View File

@ -238,13 +238,6 @@ public class DatabaseServiceResourceTest extends EntityResourceTest<DatabaseServ
assertEquals(1, updatedService.getPipelines().size());
assertReference(ingestionPipeline.getEntityReference(), updatedService.getPipelines().get(0));
// TODO remove this
DatabaseConnection expectedDatabaseConnection =
JsonUtils.convertValue(ingestionPipeline.getSource().getServiceConnection(), DatabaseConnection.class);
SnowflakeConnection expectedSnowflake =
JsonUtils.convertValue(expectedDatabaseConnection.getConfig(), SnowflakeConnection.class);
assertEquals(expectedSnowflake, snowflakeConnection);
// Delete the database service and ensure ingestion pipeline is deleted
deleteEntity(updatedService.getId(), true, true, ADMIN_AUTH_HEADERS);
ingestionPipelineResourceTest.assertEntityDeleted(ingestionPipeline.getId(), true);

View File

@ -121,7 +121,7 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
.withPipelineType(PipelineType.METADATA)
.withService(getContainer())
.withSourceConfig(DATABASE_METADATA_CONFIG)
.withAirflowConfig(new AirflowConfig().withStartDate("2021-11-21").withForceDeploy(false));
.withAirflowConfig(new AirflowConfig().withStartDate("2021-11-21"));
}
@Override
@ -139,7 +139,7 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
IngestionPipeline ingestion, CreateIngestionPipeline createRequest, Map<String, String> authHeaders)
throws HttpResponseException {
assertEquals(createRequest.getAirflowConfig().getConcurrency(), ingestion.getAirflowConfig().getConcurrency());
validateSourceConfig(createRequest.getSourceConfig(), ingestion.getSource().getSourceConfig(), ingestion);
validateSourceConfig(createRequest.getSourceConfig(), ingestion.getSourceConfig(), ingestion);
}
@Override
@ -147,7 +147,7 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
throws HttpResponseException {
assertEquals(expected.getDisplayName(), updated.getDisplayName());
assertReference(expected.getService(), updated.getService());
assertEquals(expected.getSource().getSourceConfig(), updated.getSource().getSourceConfig());
assertEquals(expected.getSourceConfig(), updated.getSourceConfig());
}
@Override
@ -215,7 +215,7 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
OK,
ADMIN_AUTH_HEADERS);
String expectedFQN = FullyQualifiedName.add(BIGQUERY_REFERENCE.getFullyQualifiedName(), ingestion.getName());
validateSourceConfig(DATABASE_METADATA_CONFIG, ingestion.getSource().getSourceConfig(), ingestion);
validateSourceConfig(DATABASE_METADATA_CONFIG, ingestion.getSourceConfig(), ingestion);
assertEquals(startDate.toString(), ingestion.getAirflowConfig().getStartDate());
assertEquals(pipelineConcurrency, ingestion.getAirflowConfig().getConcurrency());
assertEquals(expectedFQN, ingestion.getFullyQualifiedName());
@ -254,7 +254,7 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
OK,
ADMIN_AUTH_HEADERS);
String expectedFQN = FullyQualifiedName.add(BIGQUERY_REFERENCE.getFullyQualifiedName(), ingestion.getName());
validateSourceConfig(queryUsageConfig, ingestion.getSource().getSourceConfig(), ingestion);
validateSourceConfig(queryUsageConfig, ingestion.getSourceConfig(), ingestion);
assertEquals(startDate.toString(), ingestion.getAirflowConfig().getStartDate());
assertEquals(pipelineConcurrency, ingestion.getAirflowConfig().getConcurrency());
assertEquals(expectedFQN, ingestion.getFullyQualifiedName());
@ -320,7 +320,7 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
assertEquals(LogLevels.ERROR, updatedIngestion.getLoggerLevel());
validateSourceConfig(updatedSourceConfig, updatedIngestion.getSource().getSourceConfig(), ingestion);
validateSourceConfig(updatedSourceConfig, updatedIngestion.getSourceConfig(), ingestion);
}
@Test
@ -374,7 +374,7 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
assertEquals(pipelineConcurrency, ingestion.getAirflowConfig().getConcurrency());
assertEquals(expectedFQN, ingestion.getFullyQualifiedName());
assertEquals(expectedScheduleInterval, ingestion.getAirflowConfig().getScheduleInterval());
validateSourceConfig(updatedSourceConfig, updatedIngestion.getSource().getSourceConfig(), ingestion);
validateSourceConfig(updatedSourceConfig, updatedIngestion.getSourceConfig(), ingestion);
}
@Test
@ -427,7 +427,7 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
assertEquals(pipelineConcurrency, ingestion.getAirflowConfig().getConcurrency());
assertEquals(expectedFQN, ingestion.getFullyQualifiedName());
assertEquals(expectedScheduleInterval, ingestion.getAirflowConfig().getScheduleInterval());
validateSourceConfig(updatedSourceConfig, updatedIngestion.getSource().getSourceConfig(), ingestion);
validateSourceConfig(updatedSourceConfig, updatedIngestion.getSourceConfig(), ingestion);
}
@Test
@ -460,7 +460,7 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
ADMIN_AUTH_HEADERS);
String expectedFQN = FullyQualifiedName.add(BIGQUERY_REFERENCE.getFullyQualifiedName(), ingestion.getName());
validateSourceConfig(DATABASE_METADATA_CONFIG, ingestion.getSource().getSourceConfig(), ingestionPipeline);
validateSourceConfig(DATABASE_METADATA_CONFIG, ingestion.getSourceConfig(), ingestionPipeline);
assertEquals(startDate.toString(), ingestion.getAirflowConfig().getStartDate());
assertEquals(pipelineConcurrency, ingestion.getAirflowConfig().getConcurrency());
assertEquals(expectedFQN, ingestion.getFullyQualifiedName());

View File

@ -13,12 +13,17 @@ Metadata DAG common functions
"""
import json
from datetime import datetime, timedelta
from typing import Callable, Optional
from typing import Callable, Optional, Union
from airflow import DAG
from metadata.generated.schema.entity.services.dashboardService import DashboardService
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.services.messagingService import MessagingService
from metadata.generated.schema.entity.services.pipelineService import PipelineService
from metadata.generated.schema.type import basic
from metadata.ingestion.models.encoders import show_secrets_encoder
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.orm_profiler.api.workflow import ProfilerWorkflow
from metadata.utils.logger import set_loggers_level
@ -33,11 +38,58 @@ from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipel
from metadata.generated.schema.metadataIngestion.workflow import (
LogLevels,
OpenMetadataWorkflowConfig,
WorkflowConfig,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig
from metadata.ingestion.api.workflow import Workflow
def build_source(ingestion_pipeline: IngestionPipeline) -> WorkflowSource:
"""
Use the service EntityReference to build the Source.
Building the source dynamically helps us to not store any
sensitive info.
:param ingestion_pipeline: With the service ref
:return: WorkflowSource
"""
metadata = OpenMetadata(config=ingestion_pipeline.openMetadataServerConnection)
service_type = ingestion_pipeline.service.type
service: Optional[
Union[DatabaseService, MessagingService, PipelineService, DashboardService]
] = None
if service_type == "databaseService":
service: DatabaseService = metadata.get_by_name(
entity=DatabaseService, fqn=ingestion_pipeline.service.name
)
elif service_type == "pipelineService":
service: PipelineService = metadata.get_by_name(
entity=PipelineService, fqn=ingestion_pipeline.service.name
)
elif service_type == "dashboardService":
service: MessagingService = metadata.get_by_name(
entity=MessagingService, fqn=ingestion_pipeline.service.name
)
elif service_type == "messagingService":
service: DashboardService = metadata.get_by_name(
entity=DashboardService, fqn=ingestion_pipeline.service.name
)
if not service:
raise ValueError(f"Could not get service from type {service_type}")
return WorkflowSource(
type=service.serviceType.value.lower(),
serviceName=service.name.__root__,
serviceConnection=service.connection,
sourceConfig=ingestion_pipeline.sourceConfig,
)
def metadata_ingestion_workflow(workflow_config: OpenMetadataWorkflowConfig):
"""
Task that creates and runs the ingestion workflow.

View File

@ -15,6 +15,7 @@ Metadata DAG function builder
from airflow import DAG
from openmetadata.workflows.ingestion.common import (
build_dag,
build_source,
build_workflow_config_property,
metadata_ingestion_workflow,
)
@ -39,8 +40,9 @@ def build_metadata_workflow_config(
"""
Given an airflow_pipeline, prepare the workflow config JSON
"""
workflow_config = OpenMetadataWorkflowConfig(
source=ingestion_pipeline.source,
source=build_source(ingestion_pipeline),
sink=Sink(
type="metadata-rest",
config={},

View File

@ -13,7 +13,11 @@ Profiler DAG function builder
"""
from airflow import DAG
from openmetadata.workflows.ingestion.common import build_dag, profiler_workflow
from openmetadata.workflows.ingestion.common import (
build_dag,
build_source,
profiler_workflow,
)
try:
from airflow.operators.python import PythonOperator
@ -38,7 +42,7 @@ def build_profiler_workflow_config(
Given an airflow_pipeline, prepare the workflow config JSON
"""
workflow_config = OpenMetadataWorkflowConfig(
source=ingestion_pipeline.source,
source=build_source(ingestion_pipeline),
sink=Sink(
type="metadata-rest",
config={},

View File

@ -16,6 +16,7 @@ from pathlib import Path
from airflow import DAG
from openmetadata.workflows.ingestion.common import (
build_dag,
build_source,
build_workflow_config_property,
metadata_ingestion_workflow,
)
@ -49,11 +50,12 @@ def build_usage_config_from_file(
:param filename: staging location file
:return: OpenMetadataWorkflowConfig
"""
usage_source = ingestion_pipeline.source
usage_source.type = f"{usage_source.type}-usage" # Mark the source as usage
source = build_source(ingestion_pipeline)
source.type = f"{source.type}-usage" # Mark the source as usage
return OpenMetadataWorkflowConfig(
source=usage_source,
source=source,
processor=Processor(type="query-parser", config={"filter": ""}),
stage=Stage(
type="table-usage",
@ -73,7 +75,7 @@ def build_usage_workflow_config(
"""
Given an airflow_pipeline, prepare the workflow config JSON
"""
location = ingestion_pipeline.source.sourceConfig.config.stageFileLocation
location = ingestion_pipeline.sourceConfig.config.stageFileLocation
if not location:
with tempfile.NamedTemporaryFile() as tmp_file: