Support role ingestion from sample data (#2167)

This commit is contained in:
Matt 2022-01-11 16:51:18 -08:00 committed by GitHub
parent 80cc907224
commit f323afe0cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 50 additions and 3 deletions

View File

@ -11,6 +11,7 @@
"timezone": "PST",
"isAdmin": false,
"teams": "Cloud_Infra",
"roles": ["DataSteward"],
"owns": [],
"follows": []
},

View File

@ -39,6 +39,7 @@ from metadata.generated.schema.entity.services.messagingService import Messaging
from metadata.generated.schema.entity.services.pipelineService import PipelineService
from metadata.generated.schema.entity.services.storageService import StorageService
from metadata.generated.schema.entity.tags.tagCategory import Tag
from metadata.generated.schema.entity.teams.role import Role
from metadata.generated.schema.entity.teams.team import Team
from metadata.generated.schema.entity.teams.user import User
from metadata.generated.schema.type import basic
@ -204,6 +205,9 @@ class OpenMetadata(
if issubclass(entity, Tag):
return "/tags"
if issubclass(entity, get_args(Union[Role, self.get_create_entity_type(Role)])):
return "/roles"
if issubclass(entity, get_args(Union[Team, self.get_create_entity_type(Team)])):
return "/teams"
@ -267,7 +271,11 @@ class OpenMetadata(
if "service" in entity.__name__.lower():
return self.services_path
if "user" in entity.__name__.lower() or "team" in entity.__name__.lower():
if (
"user" in entity.__name__.lower()
or "role" in entity.__name__.lower()
or "team" in entity.__name__.lower()
):
return self.teams_path
return self.data_path

View File

@ -36,6 +36,7 @@ from metadata.generated.schema.api.lineage.addLineage import AddLineage
from metadata.generated.schema.api.policies.createPolicy import (
CreatePolicyEntityRequest,
)
from metadata.generated.schema.api.teams.createRole import CreateRoleEntityRequest
from metadata.generated.schema.api.teams.createTeam import CreateTeamEntityRequest
from metadata.generated.schema.api.teams.createUser import CreateUserEntityRequest
from metadata.generated.schema.entity.data.chart import ChartType
@ -97,6 +98,7 @@ class MetadataRestSink(Sink[Entity]):
self.charts_dict = {}
self.metadata = OpenMetadata(self.metadata_config)
self.api_client = self.metadata.client
self.role_entities = {}
self.team_entities = {}
self._bootstrap_entities()
@ -387,6 +389,22 @@ class MetadataRestSink(Sink[Entity]):
team_response = self.api_client.get("/teams")
for team in team_response["data"]:
self.team_entities[team["name"]] = team["id"]
role_response = self.api_client.get("/roles")
for role in role_response["data"]:
self.role_entities[role["name"]] = role["id"]
def _create_role(self, role: EntityReference) -> None:
metadata_role = CreateRoleEntityRequest(
name=role.name, displayName=role.name, description=role.description
)
try:
r = self.metadata.create_or_update(metadata_role)
instance_id = str(r.id.__root__)
self.role_entities[role.name] = instance_id
except Exception as err:
logger.debug(traceback.format_exc())
logger.debug(traceback.print_exc())
logger.error(err)
def _create_team(self, team: EntityReference) -> None:
metadata_team = CreateTeamEntityRequest(
@ -397,11 +415,16 @@ class MetadataRestSink(Sink[Entity]):
instance_id = str(r.id.__root__)
self.team_entities[team.name] = instance_id
except Exception as err:
logger.error(traceback.format_exc())
logger.error(traceback.print_exc())
logger.debug(traceback.format_exc())
logger.debug(traceback.print_exc())
logger.error(err)
def write_users(self, record: User):
roles = []
for role in record.roles.__root__:
if role.name not in self.role_entities:
self._create_role(role)
roles.append(self.role_entities[role.name])
teams = []
for team in record.teams.__root__:
if team.name not in self.team_entities:
@ -412,6 +435,7 @@ class MetadataRestSink(Sink[Entity]):
name=record.name.__root__,
displayName=record.displayName,
email=record.email,
roles=roles,
teams=teams,
)
try:

View File

@ -456,12 +456,26 @@ class SampleDataSource(Source[Entity]):
description=f"This is {user['teams']} description.",
)
]
roles = (
[
EntityReference(
id=uuid.uuid4(),
name=role,
type="role",
description=f"This is {role} description.",
)
for role in user["roles"]
]
if "roles" in user
else []
)
user_metadata = User(
id=uuid.uuid4(),
name=user["name"],
displayName=user["displayName"],
email=user["email"],
teams=teams,
roles=roles,
href=Href(__root__="http://localhost"),
)
yield user_metadata