Add ingestion for access_control_policies to support RBAC (#1969)

This commit is contained in:
Matt 2021-12-30 08:25:09 -08:00 committed by GitHub
parent 2184af6ca5
commit ab4c9ede25
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 166 additions and 2 deletions

View File

@ -0,0 +1,58 @@
{
"policies": [
{
"name": "data-steward-role",
"displayName": "Data Steward Role Policy",
"description": "Policy for Data Steward Role to perform operations on metadata entities",
"policyType": "AccessControl",
"enabled": true,
"rules": [
{
"name": "update-description",
"userRoleAttr": "DataSteward",
"operation": "UpdateDescription",
"allow": true
},
{
"name": "update-tags",
"userRoleAttr": "DataSteward",
"operation": "UpdateTags",
"allow": true
}
]
},
{
"name": "data-consumer-role",
"displayName": "Data Consumer Role Policy",
"description": "Policy for Data Consumer Role to perform operations on metadata entities",
"policyType": "AccessControl",
"enabled": true,
"rules": [
{
"name": "suggest-description",
"userRoleAttr": "DataConsumer",
"operation": "SuggestDescription",
"allow": true
},
{
"name": "suggest-tags",
"userRoleAttr": "DataConsumer",
"operation": "SuggestTags",
"allow": true
},
{
"name": "update-description",
"userRoleAttr": "DataConsumer",
"operation": "UpdateDescription",
"allow": false
},
{
"name": "update-tags",
"userRoleAttr": "DataConsumer",
"operation": "UpdateTags",
"allow": false
}
]
}
]
}

View File

@ -0,0 +1,19 @@
{
"source": {
"type": "access_control_policies",
"config": {
"policies_file": "ingestion/examples/sample_data/policies/access_control.json"
}
},
"sink": {
"type": "metadata-rest",
"config": {}
},
"metadata_server": {
"type": "metadata-server",
"config": {
"api_endpoint": "http://localhost:8585/api",
"auth_provider_type": "no-auth"
}
}
}

View File

@ -0,0 +1,76 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import uuid
from collections import Iterable
from typing import Optional
from metadata.generated.schema.entity.policies.policy import Policy
from metadata.ingestion.api.common import Entity, ConfigModel, WorkflowContext
from metadata.ingestion.api.source import Source, SourceStatus
from metadata.ingestion.models.ometa_policy import OMetaPolicy
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
logger: logging.Logger = logging.getLogger(__name__)
class AccessControlPoliciesConfig(ConfigModel):
policies_file: str
class AccessControlPoliciesSource(Source[Entity]):
config: AccessControlPoliciesConfig
status: SourceStatus
policies_data: Optional[dict] = None
def __init__(
self,
config: AccessControlPoliciesConfig,
metadata_config: MetadataServerConfig,
ctx,
):
self.config = config
self.status = SourceStatus()
@classmethod
def create(
cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext
):
config = AccessControlPoliciesConfig.parse_obj(config_dict)
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
return cls(config, metadata_config, ctx)
def prepare(self):
try:
with open(self.config.policies_file, "r") as f:
self.policies_data = json.load(f)
except Exception as e:
logger.fatal(
f"Please ensure that the configured policies_file is set up correctly - {e}"
)
def next_record(self) -> Iterable[OMetaPolicy]:
try:
for policy in self.policies_data["policies"]:
# add a generated policy id to reduce overhead of maintaining one for every policy in policies file.
policy["id"] = uuid.uuid4()
self.status.scanned(policy)
yield OMetaPolicy(policy=Policy.parse_obj(policy))
except Exception as e:
self.status.failure("error", str(e))
def get_status(self) -> SourceStatus:
return self.status
def close(self):
pass # nothing to close.

View File

@ -26,7 +26,6 @@ from metadata.generated.schema.api.data.createTopic import CreateTopicEntityRequ
from metadata.generated.schema.api.lineage.addLineage import AddLineage
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.location import Location, LocationType
from metadata.generated.schema.entity.data.mlmodel import MlModel
from metadata.generated.schema.entity.data.pipeline import Pipeline
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.teams.user import User
@ -39,6 +38,10 @@ from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.models.table_metadata import Chart, Dashboard
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.ingestion.source.access_control_policies import (
AccessControlPoliciesConfig,
AccessControlPoliciesSource,
)
from metadata.utils.helpers import (
get_dashboard_service_or_create,
get_database_service_or_create,
@ -261,6 +264,13 @@ class SampleDataSource(Source[Entity]):
self.models = json.load(
open(self.config.sample_data_folder + "/models/models.json", "r")
)
policies_config = AccessControlPoliciesConfig(
policies_file=self.config.sample_data_folder
+ "/policies/access_control.json"
)
self.policies_source = AccessControlPoliciesSource(
policies_config, metadata_config, ctx
)
@classmethod
def create(cls, config_dict, metadata_config_dict, ctx):
@ -269,7 +279,7 @@ class SampleDataSource(Source[Entity]):
return cls(config, metadata_config, ctx)
def prepare(self):
pass
self.policies_source.prepare()
def next_record(self) -> Iterable[Entity]:
yield from self.ingest_locations()
@ -282,6 +292,7 @@ class SampleDataSource(Source[Entity]):
yield from self.ingest_lineage()
yield from self.ingest_users()
yield from self.ingest_mlmodels()
yield from self.policies_source.next_record()
def ingest_locations(self) -> Iterable[Location]:
for location in self.locations["locations"]: