mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2026-01-08 05:26:19 +00:00
Add logic to initialize relationships from seed data during application startup (#2307)
* Add logic to initialize relationships from seed data during application startup * Remove ingestion related code for access control policies * Move PolicyEvaluator init to PolicyResource
This commit is contained in:
parent
979bad18d9
commit
2a80caa8fe
@ -0,0 +1,112 @@
|
||||
/*
|
||||
* Copyright 2021 Collate
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.openmetadata.catalog.jdbi3;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.UUID;
|
||||
import java.util.regex.Pattern;
|
||||
import lombok.NonNull;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.maven.shared.utils.io.IOUtil;
|
||||
import org.jdbi.v3.sqlobject.transaction.Transaction;
|
||||
import org.openmetadata.catalog.Entity;
|
||||
import org.openmetadata.catalog.type.EntityReference;
|
||||
import org.openmetadata.catalog.type.EntityRelationship;
|
||||
import org.openmetadata.catalog.util.JsonUtils;
|
||||
import org.openmetadata.common.utils.CommonUtil;
|
||||
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class EntityRelationshipRepository {
|
||||
@NonNull private final CollectionDAO daoCollection;
|
||||
|
||||
/**
|
||||
* Initialize entity relationships from json files if seed data does not exist in corresponding tables. Seed data is
|
||||
* stored under catalog-rest-service/src/main/resources/json/data/relationship
|
||||
*/
|
||||
public void initSeedDataFromResources() throws IOException {
|
||||
Pattern pattern = Pattern.compile(".*json/data/relationship/.*\\.json$");
|
||||
List<String> jsonDataFiles = CommonUtil.getResources(pattern);
|
||||
jsonDataFiles.forEach(
|
||||
jsonDataFile -> {
|
||||
try {
|
||||
String json =
|
||||
IOUtil.toString(Objects.requireNonNull(getClass().getClassLoader().getResourceAsStream(jsonDataFile)));
|
||||
initSeedData(JsonUtils.readValue(json, EntityRelationship.class));
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to initialize entity relationship from file {}: {}", jsonDataFile, e.getMessage());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/** Initialize the given relationship from seed data. */
|
||||
@Transaction
|
||||
public void initSeedData(@NonNull EntityRelationship entityRelationship) throws IOException {
|
||||
if (entityRelationship.getFromFQN() == null
|
||||
&& entityRelationship.getToFQN() == null
|
||||
&& entityRelationship.getFromId() != null
|
||||
&& entityRelationship.getToId() != null) {
|
||||
LOG.info(
|
||||
"Ensuring relationship {}({}) --- {} ---> {}({})",
|
||||
entityRelationship.getFromEntity(),
|
||||
entityRelationship.getFromId(),
|
||||
entityRelationship.getRelation(),
|
||||
entityRelationship.getToEntity(),
|
||||
entityRelationship.getToId());
|
||||
addRelationship(
|
||||
entityRelationship.getFromId(),
|
||||
entityRelationship.getToId(),
|
||||
entityRelationship.getFromEntity(),
|
||||
entityRelationship.getToEntity(),
|
||||
Relationship.valueOf(entityRelationship.getRelation()));
|
||||
return;
|
||||
}
|
||||
if (entityRelationship.getFromFQN() != null
|
||||
&& entityRelationship.getToFQN() != null
|
||||
&& entityRelationship.getFromId() == null
|
||||
&& entityRelationship.getToId() == null) {
|
||||
LOG.info(
|
||||
"Ensuring relationship {}({}) --- {} ---> {}({})",
|
||||
entityRelationship.getFromEntity(),
|
||||
entityRelationship.getFromFQN(),
|
||||
entityRelationship.getRelation(),
|
||||
entityRelationship.getToEntity(),
|
||||
entityRelationship.getToFQN());
|
||||
addRelationship(
|
||||
entityRelationship.getFromFQN(),
|
||||
entityRelationship.getToFQN(),
|
||||
entityRelationship.getFromEntity(),
|
||||
entityRelationship.getToEntity(),
|
||||
Relationship.valueOf(entityRelationship.getRelation()));
|
||||
}
|
||||
}
|
||||
|
||||
public void addRelationship(UUID fromId, UUID toId, String fromEntity, String toEntity, Relationship relationship) {
|
||||
daoCollection
|
||||
.relationshipDAO()
|
||||
.insert(fromId.toString(), toId.toString(), fromEntity, toEntity, relationship.ordinal());
|
||||
}
|
||||
|
||||
public void addRelationship(
|
||||
String fromFQN, String toFQN, String fromEntity, String toEntity, Relationship relationship) throws IOException {
|
||||
EntityReference fromRef = Entity.getEntityReferenceByName(fromEntity, fromFQN);
|
||||
EntityReference toRef = Entity.getEntityReferenceByName(toEntity, toFQN);
|
||||
daoCollection
|
||||
.relationshipDAO()
|
||||
.insert(fromRef.getId().toString(), toRef.getId().toString(), fromEntity, toEntity, relationship.ordinal());
|
||||
}
|
||||
}
|
||||
@ -21,7 +21,6 @@ import java.text.ParseException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.jdbi.v3.sqlobject.transaction.Transaction;
|
||||
@ -49,7 +48,7 @@ public class PolicyRepository extends EntityRepository<Policy> {
|
||||
new Fields(PolicyResource.FIELD_LIST, "displayName,description,owner,policyUrl,enabled,rules,location");
|
||||
public static final String ENABLED = "enabled";
|
||||
|
||||
private PolicyEvaluator policyEvaluator;
|
||||
private final PolicyEvaluator policyEvaluator;
|
||||
|
||||
public PolicyRepository(CollectionDAO dao) {
|
||||
super(
|
||||
@ -244,10 +243,6 @@ public class PolicyRepository extends EntityRepository<Policy> {
|
||||
return rules;
|
||||
}
|
||||
|
||||
public static List<Object> getRuleObjects(List<Rule> rules) {
|
||||
return rules.stream().map(Object.class::cast).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private void setLocation(Policy policy, EntityReference location) {
|
||||
if (location == null || location.getId() == null) {
|
||||
return;
|
||||
|
||||
@ -33,18 +33,19 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.jdbi.v3.core.Jdbi;
|
||||
import org.openmetadata.catalog.CatalogApplicationConfig;
|
||||
import org.openmetadata.catalog.jdbi3.CollectionDAO;
|
||||
import org.openmetadata.catalog.jdbi3.EntityRelationshipRepository;
|
||||
import org.openmetadata.catalog.security.Authorizer;
|
||||
import org.openmetadata.catalog.type.CollectionDescriptor;
|
||||
import org.openmetadata.catalog.type.CollectionInfo;
|
||||
import org.openmetadata.catalog.util.RestUtil;
|
||||
import org.reflections.Reflections;
|
||||
|
||||
@Slf4j
|
||||
/**
|
||||
* Collection registry is a registry of all the REST collections in the catalog. It is used for building REST endpoints
|
||||
* that anchor all the collections as follows: - .../api/v1 Provides information about all the collections in the
|
||||
* catalog - .../api/v1/collection-name provides sub collections or resources in that collection
|
||||
*/
|
||||
@Slf4j
|
||||
public final class CollectionRegistry {
|
||||
private static CollectionRegistry instance = null;
|
||||
|
||||
@ -139,6 +140,14 @@ public final class CollectionRegistry {
|
||||
LOG.info("Registering test resource {}", object);
|
||||
environment.jersey().register(object);
|
||||
});
|
||||
|
||||
// All entities from seed data under resources are created as part of createResource calls above.
|
||||
// Initialize relationships from seed data after all resources have been created.
|
||||
try {
|
||||
new EntityRelationshipRepository(jdbi.onDemand(CollectionDAO.class)).initSeedDataFromResources();
|
||||
} catch (Exception ex) {
|
||||
LOG.warn("Failed to register relationships from seed data: {}", ex.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/** Get collection details based on annotations in Resource classes */
|
||||
|
||||
@ -61,6 +61,7 @@ import org.openmetadata.catalog.jdbi3.PolicyRepository;
|
||||
import org.openmetadata.catalog.resources.Collection;
|
||||
import org.openmetadata.catalog.security.Authorizer;
|
||||
import org.openmetadata.catalog.security.SecurityUtil;
|
||||
import org.openmetadata.catalog.security.policyevaluator.PolicyEvaluator;
|
||||
import org.openmetadata.catalog.type.EntityHistory;
|
||||
import org.openmetadata.catalog.type.EntityReference;
|
||||
import org.openmetadata.catalog.type.Include;
|
||||
@ -96,8 +97,13 @@ public class PolicyResource {
|
||||
this.authorizer = authorizer;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused") // Method used for reflection
|
||||
@SuppressWarnings("unused") // Method is used for reflection
|
||||
public void initialize(CatalogApplicationConfig config) throws IOException {
|
||||
// Set up the PolicyEvaluator, before loading seed data.
|
||||
PolicyEvaluator policyEvaluator = PolicyEvaluator.getInstance();
|
||||
policyEvaluator.setPolicyRepository(dao);
|
||||
// Load any existing rules from database, before loading seed data.
|
||||
policyEvaluator.refreshRules();
|
||||
dao.initSeedDataFromResources();
|
||||
}
|
||||
|
||||
|
||||
@ -28,7 +28,6 @@ import org.openmetadata.catalog.Entity;
|
||||
import org.openmetadata.catalog.entity.teams.User;
|
||||
import org.openmetadata.catalog.exception.EntityNotFoundException;
|
||||
import org.openmetadata.catalog.jdbi3.CollectionDAO;
|
||||
import org.openmetadata.catalog.jdbi3.PolicyRepository;
|
||||
import org.openmetadata.catalog.jdbi3.UserRepository;
|
||||
import org.openmetadata.catalog.security.policyevaluator.PolicyEvaluator;
|
||||
import org.openmetadata.catalog.type.EntityReference;
|
||||
@ -59,8 +58,7 @@ public class DefaultAuthorizer implements Authorizer {
|
||||
this.userRepository = new UserRepository(collectionDAO);
|
||||
mayBeAddAdminUsers();
|
||||
mayBeAddBotUsers();
|
||||
policyEvaluator = PolicyEvaluator.getInstance();
|
||||
policyEvaluator.setPolicyRepository(new PolicyRepository(collectionDAO));
|
||||
this.policyEvaluator = PolicyEvaluator.getInstance();
|
||||
}
|
||||
|
||||
private void mayBeAddAdminUsers() {
|
||||
|
||||
@ -0,0 +1,7 @@
|
||||
{
|
||||
"fromFQN": "DataSteward",
|
||||
"fromEntity": "role",
|
||||
"toFQN": "DataStewardRoleAccessControlPolicy",
|
||||
"toEntity": "policy",
|
||||
"relation": "CONTAINS"
|
||||
}
|
||||
@ -0,0 +1,49 @@
|
||||
{
|
||||
"$id": "https://open-metadata.org/schema/type/entityRelationship.json",
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"title": "Entity Relationship",
|
||||
"description": "This schema defines the EntityRelationship type used for establishing relationship between two entities. EntityRelationship is used for capturing relationships from one entity to another. For example, a database contains tables.",
|
||||
"type": "object",
|
||||
"javaType": "org.openmetadata.catalog.type.EntityRelationship",
|
||||
"properties": {
|
||||
"fromId": {
|
||||
"description": "Unique identifier that identifies the entity from which the relationship originates.",
|
||||
"$ref": "basic.json#/definitions/uuid"
|
||||
},
|
||||
"fromFQN": {
|
||||
"description": "Fully qualified name of the entity from which the relationship originates.",
|
||||
"type": "string"
|
||||
},
|
||||
"fromEntity": {
|
||||
"description": "Type of the entity from which the relationship originates. Examples: `database`, `table`, `metrics` ...",
|
||||
"type": "string"
|
||||
},
|
||||
"toId": {
|
||||
"description": "Unique identifier that identifies the entity towards which the relationship refers to.",
|
||||
"$ref": "basic.json#/definitions/uuid"
|
||||
},
|
||||
"toFQN": {
|
||||
"description": "Fully qualified name of the entity towards which the relationship refers to.",
|
||||
"type": "string"
|
||||
},
|
||||
"toEntity": {
|
||||
"description": "Type of the entity towards which the relationship refers to. Examples: `database`, `table`, `metrics` ...",
|
||||
"type": "string"
|
||||
},
|
||||
"relation": {
|
||||
"description": "Describes relationship between the two entities.",
|
||||
"type": "string"
|
||||
},
|
||||
"deleted" : {
|
||||
"description": "`true` indicates the relationship has been soft deleted.",
|
||||
"type" : "boolean",
|
||||
"default": false
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"fromEntity",
|
||||
"toEntity",
|
||||
"relation"
|
||||
],
|
||||
"additionalProperties": false
|
||||
}
|
||||
@ -1,34 +0,0 @@
|
||||
{
|
||||
"policies": [
|
||||
{
|
||||
"name": "data-steward-role",
|
||||
"displayName": "Data Steward Role Policy",
|
||||
"description": "Policy for Data Steward Role to perform operations on metadata entities",
|
||||
"policyType": "AccessControl",
|
||||
"enabled": true,
|
||||
"rules": [
|
||||
{
|
||||
"name": "update-description",
|
||||
"userRoleAttr": "DataSteward",
|
||||
"operation": "UpdateDescription",
|
||||
"allow": true,
|
||||
"enabled": true
|
||||
},
|
||||
{
|
||||
"name": "update-tags",
|
||||
"userRoleAttr": "DataSteward",
|
||||
"operation": "UpdateOwner",
|
||||
"allow": true,
|
||||
"enabled": true
|
||||
},
|
||||
{
|
||||
"name": "update-tags",
|
||||
"userRoleAttr": "DataSteward",
|
||||
"operation": "UpdateTags",
|
||||
"allow": true,
|
||||
"enabled": true
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
@ -1,19 +0,0 @@
|
||||
{
|
||||
"source": {
|
||||
"type": "access_control_policies",
|
||||
"config": {
|
||||
"policies_file": "ingestion/examples/sample_data/policies/access_control.json"
|
||||
}
|
||||
},
|
||||
"sink": {
|
||||
"type": "metadata-rest",
|
||||
"config": {}
|
||||
},
|
||||
"metadata_server": {
|
||||
"type": "metadata-server",
|
||||
"config": {
|
||||
"api_endpoint": "http://localhost:8585/api",
|
||||
"auth_provider_type": "no-auth"
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,75 +0,0 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
import logging
|
||||
import uuid
|
||||
from typing import Iterable, Optional
|
||||
|
||||
from metadata.generated.schema.entity.policies.policy import Policy
|
||||
from metadata.ingestion.api.common import ConfigModel, Entity, WorkflowContext
|
||||
from metadata.ingestion.api.source import Source, SourceStatus
|
||||
from metadata.ingestion.models.ometa_policy import OMetaPolicy
|
||||
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
|
||||
|
||||
logger: logging.Logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AccessControlPoliciesConfig(ConfigModel):
|
||||
policies_file: str
|
||||
|
||||
|
||||
class AccessControlPoliciesSource(Source[Entity]):
|
||||
config: AccessControlPoliciesConfig
|
||||
status: SourceStatus
|
||||
policies_data: Optional[dict] = None
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
config: AccessControlPoliciesConfig,
|
||||
metadata_config: MetadataServerConfig,
|
||||
ctx,
|
||||
):
|
||||
self.config = config
|
||||
self.status = SourceStatus()
|
||||
|
||||
@classmethod
|
||||
def create(
|
||||
cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext
|
||||
):
|
||||
config = AccessControlPoliciesConfig.parse_obj(config_dict)
|
||||
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
|
||||
return cls(config, metadata_config, ctx)
|
||||
|
||||
def prepare(self):
|
||||
try:
|
||||
with open(self.config.policies_file, "r") as f:
|
||||
self.policies_data = json.load(f)
|
||||
except Exception as e:
|
||||
logger.fatal(
|
||||
f"Please ensure that the configured policies_file is set up correctly - {e}"
|
||||
)
|
||||
|
||||
def next_record(self) -> Iterable[OMetaPolicy]:
|
||||
try:
|
||||
for policy in self.policies_data["policies"]:
|
||||
# add a generated policy id to reduce overhead of maintaining one for every policy in policies file.
|
||||
policy["id"] = uuid.uuid4()
|
||||
self.status.scanned(policy)
|
||||
yield OMetaPolicy(policy=Policy.parse_obj(policy))
|
||||
except Exception as e:
|
||||
self.status.failure("error", str(e))
|
||||
|
||||
def get_status(self) -> SourceStatus:
|
||||
return self.status
|
||||
|
||||
def close(self):
|
||||
pass # nothing to close.
|
||||
@ -38,10 +38,6 @@ from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
|
||||
from metadata.ingestion.models.table_metadata import Chart, Dashboard
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
|
||||
from metadata.ingestion.source.access_control_policies import (
|
||||
AccessControlPoliciesConfig,
|
||||
AccessControlPoliciesSource,
|
||||
)
|
||||
from metadata.utils.helpers import (
|
||||
get_dashboard_service_or_create,
|
||||
get_database_service_or_create,
|
||||
@ -264,13 +260,6 @@ class SampleDataSource(Source[Entity]):
|
||||
self.models = json.load(
|
||||
open(self.config.sample_data_folder + "/models/models.json", "r")
|
||||
)
|
||||
policies_config = AccessControlPoliciesConfig(
|
||||
policies_file=self.config.sample_data_folder
|
||||
+ "/policies/access_control.json"
|
||||
)
|
||||
self.policies_source = AccessControlPoliciesSource(
|
||||
policies_config, metadata_config, ctx
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def create(cls, config_dict, metadata_config_dict, ctx):
|
||||
@ -279,7 +268,7 @@ class SampleDataSource(Source[Entity]):
|
||||
return cls(config, metadata_config, ctx)
|
||||
|
||||
def prepare(self):
|
||||
self.policies_source.prepare()
|
||||
pass
|
||||
|
||||
def next_record(self) -> Iterable[Entity]:
|
||||
yield from self.ingest_locations()
|
||||
@ -292,7 +281,6 @@ class SampleDataSource(Source[Entity]):
|
||||
yield from self.ingest_lineage()
|
||||
yield from self.ingest_users()
|
||||
yield from self.ingest_mlmodels()
|
||||
yield from self.policies_source.next_record()
|
||||
|
||||
def ingest_locations(self) -> Iterable[Location]:
|
||||
for location in self.locations["locations"]:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user