Fix #2516 - User Profile & Amundsen Source (#2546)

This commit is contained in:
Pere Miquel Brull 2022-02-01 22:55:26 +01:00 committed by GitHub
parent 53d70975c4
commit 9887e0773e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 91 additions and 106 deletions

View File

@ -8,45 +8,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List, Optional
import copy
import json
from typing import Any, Dict, List, Optional
from pydantic.main import BaseModel
from metadata.ingestion.models.json_serializable import JsonSerializable
UNQUOTED_SUFFIX = ":UNQUOTED"
from metadata.generated.schema.api.teams.createRole import CreateRoleRequest
from metadata.generated.schema.api.teams.createTeam import CreateTeamRequest
from metadata.generated.schema.api.teams.createUser import CreateUserRequest
class MetadataOrg(JsonSerializable):
"""
Catalog Org Model
"""
def __init__(self, name: str, documentation: str = "") -> None:
""" """
self.name = name
self.documentation = documentation
class MetadataTeam(JsonSerializable):
"""
Catalog Team Model
"""
def __init__(self, name: str, description: str = "") -> None:
""" """
self.name = name.replace(" ", "_")
self.display_name = name
self.description = description
class MetadataRole(JsonSerializable):
"""
Catalog Role
"""
def __init__(self, name: str, documentation: str = ""):
""" """
self.name = name
self.documentation = documentation
class OMetaUserProfile(BaseModel):
user: CreateUserRequest
teams: Optional[List[CreateTeamRequest]] = None
roles: Optional[List[CreateRoleRequest]] = None

View File

@ -31,10 +31,11 @@ from metadata.generated.schema.api.teams.createTeam import CreateTeamRequest
from metadata.generated.schema.api.teams.createUser import CreateUserRequest
from metadata.generated.schema.entity.data.chart import ChartType
from metadata.generated.schema.entity.data.location import Location
from metadata.generated.schema.entity.data.mlmodel import MlModel
from metadata.generated.schema.entity.data.pipeline import Pipeline
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.policies.policy import Policy
from metadata.generated.schema.entity.teams.role import Role
from metadata.generated.schema.entity.teams.team import Team
from metadata.generated.schema.entity.teams.user import User
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import Entity, WorkflowContext
@ -42,6 +43,7 @@ from metadata.ingestion.api.sink import Sink, SinkStatus
from metadata.ingestion.models.ometa_policy import OMetaPolicy
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.models.table_metadata import Chart, Dashboard, DeleteTable
from metadata.ingestion.models.user import OMetaUserProfile
from metadata.ingestion.ometa.client import APIError
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
@ -87,7 +89,6 @@ class MetadataRestSink(Sink[Entity]):
self.wrote_something = False
self.charts_dict = {}
self.metadata = OpenMetadata(self.metadata_config)
self.api_client = self.metadata.client
self.role_entities = {}
self.team_entities = {}
@ -116,7 +117,7 @@ class MetadataRestSink(Sink[Entity]):
self.write_pipelines(record)
elif isinstance(record, AddLineageRequest):
self.write_lineage(record)
elif isinstance(record, User):
elif isinstance(record, OMetaUserProfile):
self.write_users(record)
elif isinstance(record, CreateMlModelRequest):
self.write_ml_model(record)
@ -374,59 +375,73 @@ class MetadataRestSink(Sink[Entity]):
logger.error(err)
self.status.failure(f"Model: {model.name}")
def _create_role(self, role: EntityReference) -> None:
metadata_role = CreateRoleRequest(
name=role.name, displayName=role.name, description=role.description
)
def _create_role(self, create_role: CreateRoleRequest) -> Role:
try:
r = self.metadata.create_or_update(metadata_role)
instance_id = str(r.id.__root__)
self.role_entities[role.name] = instance_id
role = self.metadata.create_or_update(create_role)
self.role_entities[role.name] = str(role.id.__root__)
return role
except Exception as err:
logger.debug(traceback.format_exc())
logger.debug(traceback.print_exc())
logger.error(err)
def _create_team(self, team: EntityReference) -> None:
metadata_team = CreateTeamRequest(
name=team.name, displayName=team.name, description=team.description
)
def _create_team(self, create_team: CreateTeamRequest) -> Team:
try:
r = self.metadata.create_or_update(metadata_team)
instance_id = str(r.id.__root__)
self.team_entities[team.name] = instance_id
team = self.metadata.create_or_update(create_team)
self.team_entities[team.name] = str(team.id.__root__)
return team
except Exception as err:
logger.debug(traceback.format_exc())
logger.debug(traceback.print_exc())
logger.error(err)
def write_users(self, record: User):
roles = []
for role in record.roles.__root__:
try:
role_response = self.api_client.get(f"/roles/{role.id.__root__}")
except APIError:
self._create_role(role)
roles.append(self.role_entities[role.name])
teams = []
for team in record.teams.__root__:
try:
team_response = self.api_client.get(f"/teams/{team.id.__root__}")
except APIError:
self._create_team(team)
teams.append(self.team_entities[team.name])
def write_users(self, record: OMetaUserProfile):
"""
Given a User profile (User + Teams + Roles create requests):
1. Check if role & team exist, otherwise create
2. Add ids of role & team to the User
3. Create or update User
"""
metadata_user = CreateUserRequest(
name=record.name.__root__,
displayName=record.displayName,
email=record.email,
roles=roles,
teams=teams,
)
# Create roles if they don't exist
if record.roles: # Roles can be optional
role_ids = []
for role in record.roles:
try:
role_entity = self.metadata.get_by_name(
entity=Role, fqdn=str(role.name.__root__)
)
except APIError:
role_entity = self._create_role(role)
role_ids.append(role_entity.id)
else:
role_ids = None
# Create teams if they don't exist
if record.teams: # Teams can be optional
team_ids = []
for team in record.teams:
try:
team_entity = self.metadata.get_by_name(
entity=Team, fqdn=str(team.name.__root__)
)
except APIError:
team_entity = self._create_team(team)
team_ids.append(team_entity.id)
else:
team_ids = None
# Update user data with the new Role and Team IDs
user_profile = record.user.dict(exclude_unset=True)
user_profile["roles"] = role_ids
user_profile["teams"] = team_ids
metadata_user = CreateUserRequest(**user_profile)
# Create user
try:
self.metadata.create_or_update(metadata_user)
self.status.records_written(record.displayName)
logger.info("Sink: {}".format(record.displayName))
user = self.metadata.create_or_update(metadata_user)
self.status.records_written(user.displayName)
logger.info("User: {}".format(user.displayName))
except Exception as err:
logger.debug(traceback.format_exc())
logger.debug(traceback.print_exc())

View File

@ -21,6 +21,8 @@ from metadata.config.common import ConfigModel
from metadata.generated.schema.api.services.createDatabaseService import (
CreateDatabaseServiceRequest,
)
from metadata.generated.schema.api.teams.createTeam import CreateTeamRequest
from metadata.generated.schema.api.teams.createUser import CreateUserRequest
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import Column, Table
from metadata.generated.schema.entity.services.dashboardService import (
@ -32,7 +34,7 @@ from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import Source, SourceStatus
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.models.table_metadata import Chart, Dashboard
from metadata.ingestion.models.user import User
from metadata.ingestion.models.user import OMetaUserProfile
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.ingestion.source.neo4j_helper import Neo4JConfig, Neo4jHelper
@ -119,16 +121,17 @@ class AmundsenSource(Source[Entity]):
def create_user_entity(self, user):
try:
user_metadata = User(
user_metadata = CreateUserRequest(
email=user["email"],
first_name=user["first_name"],
last_name=user["last_name"],
name=user["full_name"],
team_name=user["team_name"],
is_active=user["is_active"],
displayName=f"{user['first_name']} {user['last_name']}",
)
team_metadata = CreateTeamRequest(name=user["team_name"])
self.status.scanned(str(user_metadata.email))
yield OMetaUserProfile(
user=user_metadata,
teams=[team_metadata],
)
self.status.scanned(user_metadata.email)
yield user_metadata
except Exception as err:
logger.error(err)

View File

@ -18,6 +18,7 @@ from metadata.config.common import ConfigModel
from metadata.generated.schema.api.teams.createUser import CreateUserRequest
from metadata.ingestion.api.common import WorkflowContext
from metadata.ingestion.api.source import Source, SourceStatus
from metadata.ingestion.models.user import OMetaUserProfile
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
logger = logging.getLogger(__name__)
@ -29,7 +30,7 @@ class LDAPUserConfig(ConfigModel):
password: str
class LdapUsersSource(Source[CreateUserRequest]):
class LdapUsersSource(Source[OMetaUserProfile]):
config: LDAPUserConfig
status: SourceStatus
@ -89,7 +90,7 @@ class LdapUsersSource(Source[CreateUserRequest]):
name=user["attributes"]["givenName"],
)
self.status.scanned(user_metadata.name)
yield user_metadata
yield OMetaUserProfile(user=user_metadata)
def get_status(self) -> SourceStatus:
return self.status

View File

@ -24,18 +24,20 @@ from metadata.config.common import ConfigModel
from metadata.generated.schema.api.data.createMlModel import CreateMlModelRequest
from metadata.generated.schema.api.data.createTopic import CreateTopicRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.api.teams.createRole import CreateRoleRequest
from metadata.generated.schema.api.teams.createTeam import CreateTeamRequest
from metadata.generated.schema.api.teams.createUser import CreateUserRequest
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.location import Location, LocationType
from metadata.generated.schema.entity.data.pipeline import Pipeline
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.teams.user import User
from metadata.generated.schema.type.basic import Href
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import Source, SourceStatus
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.models.table_metadata import Chart, Dashboard
from metadata.ingestion.models.user import OMetaUserProfile
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.utils.helpers import (
@ -433,23 +435,19 @@ class SampleDataSource(Source[Entity]):
)
yield model_ev
def ingest_users(self) -> Iterable[User]:
def ingest_users(self) -> Iterable[OMetaUserProfile]:
try:
for user in self.users["users"]:
teams = [
EntityReference(
id=uuid.uuid4(),
CreateTeamRequest(
name=user["teams"],
type="team",
description=f"This is {user['teams']} description.",
)
]
roles = (
[
EntityReference(
id=uuid.uuid4(),
CreateRoleRequest(
name=role,
type="role",
description=f"This is {role} description.",
)
for role in user["roles"]
@ -457,16 +455,13 @@ class SampleDataSource(Source[Entity]):
if "roles" in user
else []
)
user_metadata = User(
id=uuid.uuid4(),
user_metadata = CreateUserRequest(
name=user["name"],
displayName=user["displayName"],
email=user["email"],
teams=teams,
roles=roles,
href=Href(__root__="http://localhost"),
)
yield user_metadata
yield OMetaUserProfile(user=user_metadata, teams=teams, roles=roles)
except Exception as err:
logger.error(err)

View File

@ -34,7 +34,7 @@ class SQLSourceStatus(SourceStatus):
def filter(self, record: str, err: str) -> None:
self.filtered.append(record)
logger.warning(f"Dropped Table {record} due to {err}")
logger.warning(f"Filtered Table {record} due to {err}")
def build_sql_source_connection_url(