mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-09-02 13:43:22 +00:00
* FIX #16231 - Ingestion Pipeline w/ Bot Assignment * format
This commit is contained in:
parent
e5fa8c1ec3
commit
97a733b704
@ -18,6 +18,20 @@ SET
|
|||||||
, '$.connection.config.appName'), '$.connection.config.metastoreConnection')
|
, '$.connection.config.appName'), '$.connection.config.metastoreConnection')
|
||||||
WHERE dbse.serviceType = 'DeltaLake';
|
WHERE dbse.serviceType = 'DeltaLake';
|
||||||
|
|
||||||
|
-- Allow all bots to update the ingestion pipeline status
|
||||||
|
UPDATE policy_entity
|
||||||
|
SET json = JSON_ARRAY_APPEND(
|
||||||
|
json,
|
||||||
|
'$.rules',
|
||||||
|
CAST('{
|
||||||
|
"name": "BotRule-IngestionPipeline",
|
||||||
|
"description": "A bot can Edit ingestion pipelines to pass the status",
|
||||||
|
"resources": ["ingestionPipeline"],
|
||||||
|
"operations": ["ViewAll","EditAll"],
|
||||||
|
"effect": "allow"
|
||||||
|
}' AS JSON)
|
||||||
|
)
|
||||||
|
WHERE name = 'DefaultBotPolicy';
|
||||||
|
|
||||||
-- create API service entity
|
-- create API service entity
|
||||||
CREATE TABLE IF NOT EXISTS api_service_entity (
|
CREATE TABLE IF NOT EXISTS api_service_entity (
|
||||||
|
@ -12,6 +12,24 @@ SET json = JSONB_SET(
|
|||||||
WHERE serviceType = 'DeltaLake';
|
WHERE serviceType = 'DeltaLake';
|
||||||
|
|
||||||
|
|
||||||
|
-- Allow all bots to update the ingestion pipeline status
|
||||||
|
UPDATE policy_entity
|
||||||
|
SET json = jsonb_set(
|
||||||
|
json,
|
||||||
|
'{rules}',
|
||||||
|
(json->'rules')::jsonb || to_jsonb(ARRAY[
|
||||||
|
jsonb_build_object(
|
||||||
|
'name', 'BotRule-IngestionPipeline',
|
||||||
|
'description', 'A bot can Edit ingestion pipelines to pass the status',
|
||||||
|
'resources', jsonb_build_array('ingestionPipeline'),
|
||||||
|
'operations', jsonb_build_array('ViewAll', 'EditAll'),
|
||||||
|
'effect', 'allow'
|
||||||
|
)
|
||||||
|
]),
|
||||||
|
true
|
||||||
|
)
|
||||||
|
WHERE json->>'name' = 'DefaultBotPolicy';
|
||||||
|
|
||||||
-- create API service entity
|
-- create API service entity
|
||||||
CREATE TABLE IF NOT EXISTS api_service_entity (
|
CREATE TABLE IF NOT EXISTS api_service_entity (
|
||||||
id VARCHAR(36) GENERATED ALWAYS AS (json ->> 'id') STORED NOT NULL,
|
id VARCHAR(36) GENERATED ALWAYS AS (json ->> 'id') STORED NOT NULL,
|
||||||
|
@ -229,11 +229,6 @@ public final class Entity {
|
|||||||
public static final String ORGANIZATION_NAME = "Organization";
|
public static final String ORGANIZATION_NAME = "Organization";
|
||||||
public static final String ORGANIZATION_POLICY_NAME = "OrganizationPolicy";
|
public static final String ORGANIZATION_POLICY_NAME = "OrganizationPolicy";
|
||||||
public static final String INGESTION_BOT_NAME = "ingestion-bot";
|
public static final String INGESTION_BOT_NAME = "ingestion-bot";
|
||||||
public static final String INGESTION_BOT_ROLE = "IngestionBotRole";
|
|
||||||
public static final String PROFILER_BOT_NAME = "profiler-bot";
|
|
||||||
public static final String PROFILER_BOT_ROLE = "ProfilerBotRole";
|
|
||||||
public static final String QUALITY_BOT_NAME = "quality-bot";
|
|
||||||
public static final String QUALITY_BOT_ROLE = "QualityBotRole";
|
|
||||||
public static final String ALL_RESOURCES = "All";
|
public static final String ALL_RESOURCES = "All";
|
||||||
|
|
||||||
public static final String DOCUMENT = "document";
|
public static final String DOCUMENT = "document";
|
||||||
|
@ -974,7 +974,7 @@ public class IngestionPipelineResource
|
|||||||
}
|
}
|
||||||
secretsManager.decryptIngestionPipeline(ingestionPipeline);
|
secretsManager.decryptIngestionPipeline(ingestionPipeline);
|
||||||
OpenMetadataConnection openMetadataServerConnection =
|
OpenMetadataConnection openMetadataServerConnection =
|
||||||
new OpenMetadataConnectionBuilder(openMetadataApplicationConfig).build();
|
new OpenMetadataConnectionBuilder(openMetadataApplicationConfig, ingestionPipeline).build();
|
||||||
ingestionPipeline.setOpenMetadataServerConnection(
|
ingestionPipeline.setOpenMetadataServerConnection(
|
||||||
secretsManager.encryptOpenMetadataConnection(openMetadataServerConnection, false));
|
secretsManager.encryptOpenMetadataConnection(openMetadataServerConnection, false));
|
||||||
if (authorizer.shouldMaskPasswords(securityContext) && !forceNotMask) {
|
if (authorizer.shouldMaskPasswords(securityContext) && !forceNotMask) {
|
||||||
|
@ -20,8 +20,11 @@ import org.openmetadata.schema.api.configuration.pipelineServiceClient.PipelineS
|
|||||||
import org.openmetadata.schema.auth.JWTAuthMechanism;
|
import org.openmetadata.schema.auth.JWTAuthMechanism;
|
||||||
import org.openmetadata.schema.auth.SSOAuthMechanism;
|
import org.openmetadata.schema.auth.SSOAuthMechanism;
|
||||||
import org.openmetadata.schema.entity.Bot;
|
import org.openmetadata.schema.entity.Bot;
|
||||||
|
import org.openmetadata.schema.entity.applications.configuration.ApplicationConfig;
|
||||||
|
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
|
||||||
import org.openmetadata.schema.entity.teams.AuthenticationMechanism;
|
import org.openmetadata.schema.entity.teams.AuthenticationMechanism;
|
||||||
import org.openmetadata.schema.entity.teams.User;
|
import org.openmetadata.schema.entity.teams.User;
|
||||||
|
import org.openmetadata.schema.metadataIngestion.ApplicationPipeline;
|
||||||
import org.openmetadata.schema.security.client.OpenMetadataJWTClientConfig;
|
import org.openmetadata.schema.security.client.OpenMetadataJWTClientConfig;
|
||||||
import org.openmetadata.schema.security.secrets.SecretsManagerClientLoader;
|
import org.openmetadata.schema.security.secrets.SecretsManagerClientLoader;
|
||||||
import org.openmetadata.schema.security.secrets.SecretsManagerProvider;
|
import org.openmetadata.schema.security.secrets.SecretsManagerProvider;
|
||||||
@ -41,7 +44,6 @@ import org.openmetadata.service.util.EntityUtil.Fields;
|
|||||||
public class OpenMetadataConnectionBuilder {
|
public class OpenMetadataConnectionBuilder {
|
||||||
|
|
||||||
AuthProvider authProvider;
|
AuthProvider authProvider;
|
||||||
String bot;
|
|
||||||
OpenMetadataJWTClientConfig securityConfig;
|
OpenMetadataJWTClientConfig securityConfig;
|
||||||
private VerifySSL verifySSL;
|
private VerifySSL verifySSL;
|
||||||
private String openMetadataURL;
|
private String openMetadataURL;
|
||||||
@ -64,6 +66,45 @@ public class OpenMetadataConnectionBuilder {
|
|||||||
initializeBotUser(botName);
|
initializeBotUser(botName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public OpenMetadataConnectionBuilder(
|
||||||
|
OpenMetadataApplicationConfig openMetadataApplicationConfig,
|
||||||
|
IngestionPipeline ingestionPipeline) {
|
||||||
|
initializeOpenMetadataConnectionBuilder(openMetadataApplicationConfig);
|
||||||
|
// Try to load the pipeline bot or default to using the ingestion bot
|
||||||
|
try {
|
||||||
|
initializeBotUser(getBotFromPipeline(ingestionPipeline));
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.warn(
|
||||||
|
String.format(
|
||||||
|
"Could not initialize bot for pipeline [%s] due to [%s]",
|
||||||
|
ingestionPipeline.getPipelineType(), e));
|
||||||
|
initializeBotUser(Entity.INGESTION_BOT_NAME);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getBotFromPipeline(IngestionPipeline ingestionPipeline) {
|
||||||
|
String botName;
|
||||||
|
switch (ingestionPipeline.getPipelineType()) {
|
||||||
|
case METADATA, DBT -> botName = Entity.INGESTION_BOT_NAME;
|
||||||
|
case APPLICATION -> {
|
||||||
|
ApplicationPipeline applicationPipeline =
|
||||||
|
JsonUtils.convertValue(
|
||||||
|
ingestionPipeline.getSourceConfig().getConfig(), ApplicationPipeline.class);
|
||||||
|
ApplicationConfig appConfig =
|
||||||
|
JsonUtils.convertValue(applicationPipeline.getAppConfig(), ApplicationConfig.class);
|
||||||
|
String type = (String) appConfig.getAdditionalProperties().get("type");
|
||||||
|
botName = String.format("%sApplicationBot", type);
|
||||||
|
}
|
||||||
|
// TODO: Remove this once we internalize the DataInsights app
|
||||||
|
// For now we need it since DataInsights has its own pipelineType inherited from when it was
|
||||||
|
// a standalone workflow
|
||||||
|
case DATA_INSIGHT -> botName = "DataInsightsApplicationBot";
|
||||||
|
default -> botName =
|
||||||
|
String.format("%s-bot", ingestionPipeline.getPipelineType().toString().toLowerCase());
|
||||||
|
}
|
||||||
|
return botName;
|
||||||
|
}
|
||||||
|
|
||||||
private void initializeOpenMetadataConnectionBuilder(
|
private void initializeOpenMetadataConnectionBuilder(
|
||||||
OpenMetadataApplicationConfig openMetadataApplicationConfig) {
|
OpenMetadataApplicationConfig openMetadataApplicationConfig) {
|
||||||
botRepository = (BotRepository) Entity.getEntityRepository(Entity.BOT);
|
botRepository = (BotRepository) Entity.getEntityRepository(Entity.BOT);
|
||||||
@ -160,23 +201,21 @@ public class OpenMetadataConnectionBuilder {
|
|||||||
|
|
||||||
private User retrieveIngestionBotUser(String botName) {
|
private User retrieveIngestionBotUser(String botName) {
|
||||||
try {
|
try {
|
||||||
Bot bot1 = botRepository.getByName(null, botName, Fields.EMPTY_FIELDS);
|
Bot bot = botRepository.getByName(null, botName, Fields.EMPTY_FIELDS);
|
||||||
if (bot1.getBotUser() == null) {
|
if (bot.getBotUser() == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
User user =
|
User user =
|
||||||
userRepository.getByName(
|
userRepository.getByName(
|
||||||
null,
|
null,
|
||||||
bot1.getBotUser().getFullyQualifiedName(),
|
bot.getBotUser().getFullyQualifiedName(),
|
||||||
new EntityUtil.Fields(Set.of("authenticationMechanism")));
|
new EntityUtil.Fields(Set.of("authenticationMechanism")));
|
||||||
if (user.getAuthenticationMechanism() != null) {
|
if (user.getAuthenticationMechanism() != null) {
|
||||||
user.getAuthenticationMechanism().setConfig(user.getAuthenticationMechanism().getConfig());
|
user.getAuthenticationMechanism().setConfig(user.getAuthenticationMechanism().getConfig());
|
||||||
}
|
}
|
||||||
return user;
|
return user;
|
||||||
} catch (EntityNotFoundException ex) {
|
} catch (EntityNotFoundException ex) {
|
||||||
LOG.debug(
|
LOG.debug((String.format("User for bot [%s]", botName)) + " [{}] not found.", botName);
|
||||||
(bot == null ? "Bot" : String.format("User for bot [%s]", botName)) + " [{}] not found.",
|
|
||||||
botName);
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -44,7 +44,6 @@ import org.openmetadata.schema.ServiceEntityInterface;
|
|||||||
import org.openmetadata.schema.entity.app.App;
|
import org.openmetadata.schema.entity.app.App;
|
||||||
import org.openmetadata.schema.entity.app.AppRunRecord;
|
import org.openmetadata.schema.entity.app.AppRunRecord;
|
||||||
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
|
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
|
||||||
import org.openmetadata.schema.services.connections.metadata.OpenMetadataConnection;
|
|
||||||
import org.openmetadata.schema.system.EventPublisherJob;
|
import org.openmetadata.schema.system.EventPublisherJob;
|
||||||
import org.openmetadata.schema.type.Include;
|
import org.openmetadata.schema.type.Include;
|
||||||
import org.openmetadata.sdk.PipelineServiceClientInterface;
|
import org.openmetadata.sdk.PipelineServiceClientInterface;
|
||||||
@ -452,13 +451,11 @@ public class OpenMetadataOperations implements Callable<Integer> {
|
|||||||
PipelineServiceClientInterface pipelineServiceClient,
|
PipelineServiceClientInterface pipelineServiceClient,
|
||||||
List<List<String>> pipelineStatuses) {
|
List<List<String>> pipelineStatuses) {
|
||||||
try {
|
try {
|
||||||
|
// TODO: IS THIS OK?
|
||||||
LOG.debug(String.format("deploying pipeline %s", pipeline.getName()));
|
LOG.debug(String.format("deploying pipeline %s", pipeline.getName()));
|
||||||
pipeline.setOpenMetadataServerConnection(new OpenMetadataConnectionBuilder(config).build());
|
|
||||||
secretsManager.decryptIngestionPipeline(pipeline);
|
|
||||||
OpenMetadataConnection openMetadataServerConnection =
|
|
||||||
new OpenMetadataConnectionBuilder(config).build();
|
|
||||||
pipeline.setOpenMetadataServerConnection(
|
pipeline.setOpenMetadataServerConnection(
|
||||||
secretsManager.encryptOpenMetadataConnection(openMetadataServerConnection, false));
|
new OpenMetadataConnectionBuilder(config, pipeline).build());
|
||||||
|
secretsManager.decryptIngestionPipeline(pipeline);
|
||||||
ServiceEntityInterface service =
|
ServiceEntityInterface service =
|
||||||
Entity.getEntity(pipeline.getService(), "", Include.NON_DELETED);
|
Entity.getEntity(pipeline.getService(), "", Include.NON_DELETED);
|
||||||
pipelineServiceClient.deployPipeline(pipeline, service);
|
pipelineServiceClient.deployPipeline(pipeline, service);
|
||||||
|
@ -0,0 +1,11 @@
|
|||||||
|
{
|
||||||
|
"name": "lineage-bot",
|
||||||
|
"displayName": "LineageBot",
|
||||||
|
"description": "Bot used for ingesting lineage metadata.",
|
||||||
|
"fullyQualifiedName": "lineage-bot",
|
||||||
|
"botUser": {
|
||||||
|
"name" : "lineage-bot",
|
||||||
|
"type" : "user"
|
||||||
|
},
|
||||||
|
"provider": "system"
|
||||||
|
}
|
@ -0,0 +1,11 @@
|
|||||||
|
{
|
||||||
|
"name": "profiler-bot",
|
||||||
|
"displayName": "ProfilerBot",
|
||||||
|
"description": "Bot used for ingesting profiling & sample data.",
|
||||||
|
"fullyQualifiedName": "profiler-bot",
|
||||||
|
"botUser": {
|
||||||
|
"name" : "profiler-bot",
|
||||||
|
"type" : "user"
|
||||||
|
},
|
||||||
|
"provider": "system"
|
||||||
|
}
|
@ -0,0 +1,11 @@
|
|||||||
|
{
|
||||||
|
"name": "testsuite-bot",
|
||||||
|
"displayName": "TestSuiteBot",
|
||||||
|
"description": "Bot used for ingesting data quality.",
|
||||||
|
"fullyQualifiedName": "testsuite-bot",
|
||||||
|
"botUser": {
|
||||||
|
"name" : "testsuite-bot",
|
||||||
|
"type" : "user"
|
||||||
|
},
|
||||||
|
"provider": "system"
|
||||||
|
}
|
@ -0,0 +1,11 @@
|
|||||||
|
{
|
||||||
|
"name": "usage-bot",
|
||||||
|
"displayName": "UsageBot",
|
||||||
|
"description": "Bot used for ingesting usage metadata.",
|
||||||
|
"fullyQualifiedName": "usage-bot",
|
||||||
|
"botUser": {
|
||||||
|
"name" : "usage-bot",
|
||||||
|
"type" : "user"
|
||||||
|
},
|
||||||
|
"provider": "system"
|
||||||
|
}
|
@ -0,0 +1,9 @@
|
|||||||
|
{
|
||||||
|
"name": "lineage-bot",
|
||||||
|
"roles": [
|
||||||
|
{
|
||||||
|
"name": "LineageBotRole",
|
||||||
|
"type": "role"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
@ -0,0 +1,9 @@
|
|||||||
|
{
|
||||||
|
"name": "profiler-bot",
|
||||||
|
"roles": [
|
||||||
|
{
|
||||||
|
"name": "ProfilerBotRole",
|
||||||
|
"type": "role"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
@ -0,0 +1,9 @@
|
|||||||
|
{
|
||||||
|
"name": "testsuite-bot",
|
||||||
|
"roles": [
|
||||||
|
{
|
||||||
|
"name": "QualityBotRole",
|
||||||
|
"type": "role"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
@ -0,0 +1,9 @@
|
|||||||
|
{
|
||||||
|
"name": "usage-bot",
|
||||||
|
"roles": [
|
||||||
|
{
|
||||||
|
"name": "UsageBotRole",
|
||||||
|
"type": "role"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
@ -13,6 +13,13 @@
|
|||||||
"resources" : ["bot", "webhook"],
|
"resources" : ["bot", "webhook"],
|
||||||
"operations": ["Create", "Delete"],
|
"operations": ["Create", "Delete"],
|
||||||
"effect": "deny"
|
"effect": "deny"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "BotRule-IngestionPipeline",
|
||||||
|
"description" : "A bot can Edit ingestion pipelines to pass the status",
|
||||||
|
"resources" : ["ingestionPipeline"],
|
||||||
|
"operations": ["ViewAll","EditAll"],
|
||||||
|
"effect": "allow"
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
@ -0,0 +1,25 @@
|
|||||||
|
{
|
||||||
|
"name": "LineageBotPolicy",
|
||||||
|
"displayName": "Lineage Bot Policy",
|
||||||
|
"fullyQualifiedName": "LineageBotPolicy",
|
||||||
|
"description": "Policy for Lineage Bot to perform operations on metadata entities.",
|
||||||
|
"enabled": true,
|
||||||
|
"allowDelete": false,
|
||||||
|
"provider": "system",
|
||||||
|
"rules": [
|
||||||
|
{
|
||||||
|
"name": "UsageBotRule-Allow-Query",
|
||||||
|
"description" : "Allow creating and updated Queries.",
|
||||||
|
"resources" : ["query"],
|
||||||
|
"operations": ["Create", "EditAll", "ViewAll"],
|
||||||
|
"effect": "allow"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "LineageBotRule-Allow",
|
||||||
|
"description" : "Allow creating and updating lineage",
|
||||||
|
"resources" : ["All"],
|
||||||
|
"operations": ["EditLineage", "EditQueries", "ViewAll"],
|
||||||
|
"effect": "allow"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
@ -0,0 +1,25 @@
|
|||||||
|
{
|
||||||
|
"name": "UsageBotPolicy",
|
||||||
|
"displayName": "Usage Bot Policy",
|
||||||
|
"fullyQualifiedName": "UsageBotPolicy",
|
||||||
|
"description": "Policy for Usage Bot to perform operations on metadata entities.",
|
||||||
|
"enabled": true,
|
||||||
|
"allowDelete": false,
|
||||||
|
"provider": "system",
|
||||||
|
"rules": [
|
||||||
|
{
|
||||||
|
"name": "UsageBotRule-Allow-Query-Table",
|
||||||
|
"description" : "Allow creating and updated Queries.",
|
||||||
|
"resources" : ["query", "table"],
|
||||||
|
"operations": ["Create", "EditAll", "ViewAll"],
|
||||||
|
"effect": "allow"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "UsageBotRule-Allow-Usage",
|
||||||
|
"description" : "Allow handling usage and lifecycle information.",
|
||||||
|
"resources" : ["All"],
|
||||||
|
"operations": ["EditAll", "ViewAll"],
|
||||||
|
"effect": "allow"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
@ -0,0 +1,17 @@
|
|||||||
|
{
|
||||||
|
"name": "LineageBotRole",
|
||||||
|
"displayName": "Lineage bot role",
|
||||||
|
"description": "Role corresponding to a Lineage bot.",
|
||||||
|
"allowDelete": false,
|
||||||
|
"provider": "system",
|
||||||
|
"policies" : [
|
||||||
|
{
|
||||||
|
"type" : "policy",
|
||||||
|
"name" : "DefaultBotPolicy"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type" : "policy",
|
||||||
|
"name" : "LineageBotPolicy"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
@ -0,0 +1,17 @@
|
|||||||
|
{
|
||||||
|
"name": "UsageBotRole",
|
||||||
|
"displayName": "Usage bot role",
|
||||||
|
"description": "Role corresponding to a Usage bot.",
|
||||||
|
"allowDelete": false,
|
||||||
|
"provider": "system",
|
||||||
|
"policies" : [
|
||||||
|
{
|
||||||
|
"type" : "policy",
|
||||||
|
"name" : "DefaultBotPolicy"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type" : "policy",
|
||||||
|
"name" : "UsageBotPolicy"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user