From 9887e0773ea8d6630a276595128f57a291c8675d Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Tue, 1 Feb 2022 22:55:26 +0100 Subject: [PATCH] Fix #2516 - User Profile & Amundsen Source (#2546) --- .../src/metadata/ingestion/models/user.py | 47 ++------- .../metadata/ingestion/sink/metadata_rest.py | 99 +++++++++++-------- .../src/metadata/ingestion/source/amundsen.py | 19 ++-- .../metadata/ingestion/source/ldap_users.py | 5 +- .../metadata/ingestion/source/sample_data.py | 25 ++--- .../ingestion/source/sql_source_common.py | 2 +- 6 files changed, 91 insertions(+), 106 deletions(-) diff --git a/ingestion/src/metadata/ingestion/models/user.py b/ingestion/src/metadata/ingestion/models/user.py index 4a2473b36c5..9192b22ff04 100644 --- a/ingestion/src/metadata/ingestion/models/user.py +++ b/ingestion/src/metadata/ingestion/models/user.py @@ -8,45 +8,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from typing import List, Optional -import copy -import json -from typing import Any, Dict, List, Optional +from pydantic.main import BaseModel -from metadata.ingestion.models.json_serializable import JsonSerializable - -UNQUOTED_SUFFIX = ":UNQUOTED" +from metadata.generated.schema.api.teams.createRole import CreateRoleRequest +from metadata.generated.schema.api.teams.createTeam import CreateTeamRequest +from metadata.generated.schema.api.teams.createUser import CreateUserRequest -class MetadataOrg(JsonSerializable): - """ - Catalog Org Model - """ - - def __init__(self, name: str, documentation: str = "") -> None: - """ """ - self.name = name - self.documentation = documentation - - -class MetadataTeam(JsonSerializable): - """ - Catalog Team Model - """ - - def __init__(self, name: str, description: str = "") -> None: - """ """ - self.name = name.replace(" ", "_") - self.display_name = name - self.description = description - - -class MetadataRole(JsonSerializable): - """ - Catalog Role - """ - - def __init__(self, name: str, documentation: str = ""): - """ """ - self.name = name - self.documentation = documentation +class OMetaUserProfile(BaseModel): + user: CreateUserRequest + teams: Optional[List[CreateTeamRequest]] = None + roles: Optional[List[CreateRoleRequest]] = None diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest.py index 2d8904a729c..ac4bd3f1c92 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest.py @@ -31,10 +31,11 @@ from metadata.generated.schema.api.teams.createTeam import CreateTeamRequest from metadata.generated.schema.api.teams.createUser import CreateUserRequest from metadata.generated.schema.entity.data.chart import ChartType from metadata.generated.schema.entity.data.location import Location -from metadata.generated.schema.entity.data.mlmodel import MlModel from metadata.generated.schema.entity.data.pipeline import Pipeline from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.policies.policy import Policy +from metadata.generated.schema.entity.teams.role import Role +from metadata.generated.schema.entity.teams.team import Team from metadata.generated.schema.entity.teams.user import User from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.common import Entity, WorkflowContext @@ -42,6 +43,7 @@ from metadata.ingestion.api.sink import Sink, SinkStatus from metadata.ingestion.models.ometa_policy import OMetaPolicy from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable from metadata.ingestion.models.table_metadata import Chart, Dashboard, DeleteTable +from metadata.ingestion.models.user import OMetaUserProfile from metadata.ingestion.ometa.client import APIError from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig @@ -87,7 +89,6 @@ class MetadataRestSink(Sink[Entity]): self.wrote_something = False self.charts_dict = {} self.metadata = OpenMetadata(self.metadata_config) - self.api_client = self.metadata.client self.role_entities = {} self.team_entities = {} @@ -116,7 +117,7 @@ class MetadataRestSink(Sink[Entity]): self.write_pipelines(record) elif isinstance(record, AddLineageRequest): self.write_lineage(record) - elif isinstance(record, User): + elif isinstance(record, OMetaUserProfile): self.write_users(record) elif isinstance(record, CreateMlModelRequest): self.write_ml_model(record) @@ -374,59 +375,73 @@ class MetadataRestSink(Sink[Entity]): logger.error(err) self.status.failure(f"Model: {model.name}") - def _create_role(self, role: EntityReference) -> None: - metadata_role = CreateRoleRequest( - name=role.name, displayName=role.name, description=role.description - ) + def _create_role(self, create_role: CreateRoleRequest) -> Role: try: - r = self.metadata.create_or_update(metadata_role) - instance_id = str(r.id.__root__) - self.role_entities[role.name] = instance_id + role = self.metadata.create_or_update(create_role) + self.role_entities[role.name] = str(role.id.__root__) + return role except Exception as err: logger.debug(traceback.format_exc()) logger.debug(traceback.print_exc()) logger.error(err) - def _create_team(self, team: EntityReference) -> None: - metadata_team = CreateTeamRequest( - name=team.name, displayName=team.name, description=team.description - ) + def _create_team(self, create_team: CreateTeamRequest) -> Team: try: - r = self.metadata.create_or_update(metadata_team) - instance_id = str(r.id.__root__) - self.team_entities[team.name] = instance_id + team = self.metadata.create_or_update(create_team) + self.team_entities[team.name] = str(team.id.__root__) + return team except Exception as err: logger.debug(traceback.format_exc()) logger.debug(traceback.print_exc()) logger.error(err) - def write_users(self, record: User): - roles = [] - for role in record.roles.__root__: - try: - role_response = self.api_client.get(f"/roles/{role.id.__root__}") - except APIError: - self._create_role(role) - roles.append(self.role_entities[role.name]) - teams = [] - for team in record.teams.__root__: - try: - team_response = self.api_client.get(f"/teams/{team.id.__root__}") - except APIError: - self._create_team(team) - teams.append(self.team_entities[team.name]) + def write_users(self, record: OMetaUserProfile): + """ + Given a User profile (User + Teams + Roles create requests): + 1. Check if role & team exist, otherwise create + 2. Add ids of role & team to the User + 3. Create or update User + """ - metadata_user = CreateUserRequest( - name=record.name.__root__, - displayName=record.displayName, - email=record.email, - roles=roles, - teams=teams, - ) + # Create roles if they don't exist + if record.roles: # Roles can be optional + role_ids = [] + for role in record.roles: + try: + role_entity = self.metadata.get_by_name( + entity=Role, fqdn=str(role.name.__root__) + ) + except APIError: + role_entity = self._create_role(role) + role_ids.append(role_entity.id) + else: + role_ids = None + + # Create teams if they don't exist + if record.teams: # Teams can be optional + team_ids = [] + for team in record.teams: + try: + team_entity = self.metadata.get_by_name( + entity=Team, fqdn=str(team.name.__root__) + ) + except APIError: + team_entity = self._create_team(team) + team_ids.append(team_entity.id) + else: + team_ids = None + + # Update user data with the new Role and Team IDs + user_profile = record.user.dict(exclude_unset=True) + user_profile["roles"] = role_ids + user_profile["teams"] = team_ids + metadata_user = CreateUserRequest(**user_profile) + + # Create user try: - self.metadata.create_or_update(metadata_user) - self.status.records_written(record.displayName) - logger.info("Sink: {}".format(record.displayName)) + user = self.metadata.create_or_update(metadata_user) + self.status.records_written(user.displayName) + logger.info("User: {}".format(user.displayName)) except Exception as err: logger.debug(traceback.format_exc()) logger.debug(traceback.print_exc()) diff --git a/ingestion/src/metadata/ingestion/source/amundsen.py b/ingestion/src/metadata/ingestion/source/amundsen.py index b8f030f9b25..ba68378f9d9 100644 --- a/ingestion/src/metadata/ingestion/source/amundsen.py +++ b/ingestion/src/metadata/ingestion/source/amundsen.py @@ -21,6 +21,8 @@ from metadata.config.common import ConfigModel from metadata.generated.schema.api.services.createDatabaseService import ( CreateDatabaseServiceRequest, ) +from metadata.generated.schema.api.teams.createTeam import CreateTeamRequest +from metadata.generated.schema.api.teams.createUser import CreateUserRequest from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.table import Column, Table from metadata.generated.schema.entity.services.dashboardService import ( @@ -32,7 +34,7 @@ from metadata.ingestion.api.common import Entity from metadata.ingestion.api.source import Source, SourceStatus from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable from metadata.ingestion.models.table_metadata import Chart, Dashboard -from metadata.ingestion.models.user import User +from metadata.ingestion.models.user import OMetaUserProfile from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig from metadata.ingestion.source.neo4j_helper import Neo4JConfig, Neo4jHelper @@ -119,16 +121,17 @@ class AmundsenSource(Source[Entity]): def create_user_entity(self, user): try: - user_metadata = User( + user_metadata = CreateUserRequest( email=user["email"], - first_name=user["first_name"], - last_name=user["last_name"], name=user["full_name"], - team_name=user["team_name"], - is_active=user["is_active"], + displayName=f"{user['first_name']} {user['last_name']}", + ) + team_metadata = CreateTeamRequest(name=user["team_name"]) + self.status.scanned(str(user_metadata.email)) + yield OMetaUserProfile( + user=user_metadata, + teams=[team_metadata], ) - self.status.scanned(user_metadata.email) - yield user_metadata except Exception as err: logger.error(err) diff --git a/ingestion/src/metadata/ingestion/source/ldap_users.py b/ingestion/src/metadata/ingestion/source/ldap_users.py index 7c90cdfe796..8d45a6f3ae1 100644 --- a/ingestion/src/metadata/ingestion/source/ldap_users.py +++ b/ingestion/src/metadata/ingestion/source/ldap_users.py @@ -18,6 +18,7 @@ from metadata.config.common import ConfigModel from metadata.generated.schema.api.teams.createUser import CreateUserRequest from metadata.ingestion.api.common import WorkflowContext from metadata.ingestion.api.source import Source, SourceStatus +from metadata.ingestion.models.user import OMetaUserProfile from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig logger = logging.getLogger(__name__) @@ -29,7 +30,7 @@ class LDAPUserConfig(ConfigModel): password: str -class LdapUsersSource(Source[CreateUserRequest]): +class LdapUsersSource(Source[OMetaUserProfile]): config: LDAPUserConfig status: SourceStatus @@ -89,7 +90,7 @@ class LdapUsersSource(Source[CreateUserRequest]): name=user["attributes"]["givenName"], ) self.status.scanned(user_metadata.name) - yield user_metadata + yield OMetaUserProfile(user=user_metadata) def get_status(self) -> SourceStatus: return self.status diff --git a/ingestion/src/metadata/ingestion/source/sample_data.py b/ingestion/src/metadata/ingestion/source/sample_data.py index ef6a48c6b44..2c5985dc6d0 100644 --- a/ingestion/src/metadata/ingestion/source/sample_data.py +++ b/ingestion/src/metadata/ingestion/source/sample_data.py @@ -24,18 +24,20 @@ from metadata.config.common import ConfigModel from metadata.generated.schema.api.data.createMlModel import CreateMlModelRequest from metadata.generated.schema.api.data.createTopic import CreateTopicRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest +from metadata.generated.schema.api.teams.createRole import CreateRoleRequest +from metadata.generated.schema.api.teams.createTeam import CreateTeamRequest +from metadata.generated.schema.api.teams.createUser import CreateUserRequest from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.location import Location, LocationType from metadata.generated.schema.entity.data.pipeline import Pipeline from metadata.generated.schema.entity.data.table import Table -from metadata.generated.schema.entity.teams.user import User -from metadata.generated.schema.type.basic import Href from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.common import Entity from metadata.ingestion.api.source import Source, SourceStatus from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable from metadata.ingestion.models.table_metadata import Chart, Dashboard +from metadata.ingestion.models.user import OMetaUserProfile from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig from metadata.utils.helpers import ( @@ -433,23 +435,19 @@ class SampleDataSource(Source[Entity]): ) yield model_ev - def ingest_users(self) -> Iterable[User]: + def ingest_users(self) -> Iterable[OMetaUserProfile]: try: for user in self.users["users"]: teams = [ - EntityReference( - id=uuid.uuid4(), + CreateTeamRequest( name=user["teams"], - type="team", description=f"This is {user['teams']} description.", ) ] roles = ( [ - EntityReference( - id=uuid.uuid4(), + CreateRoleRequest( name=role, - type="role", description=f"This is {role} description.", ) for role in user["roles"] @@ -457,16 +455,13 @@ class SampleDataSource(Source[Entity]): if "roles" in user else [] ) - user_metadata = User( - id=uuid.uuid4(), + user_metadata = CreateUserRequest( name=user["name"], displayName=user["displayName"], email=user["email"], - teams=teams, - roles=roles, - href=Href(__root__="http://localhost"), ) - yield user_metadata + + yield OMetaUserProfile(user=user_metadata, teams=teams, roles=roles) except Exception as err: logger.error(err) diff --git a/ingestion/src/metadata/ingestion/source/sql_source_common.py b/ingestion/src/metadata/ingestion/source/sql_source_common.py index 32f1ad6756d..12b9c4f3a32 100644 --- a/ingestion/src/metadata/ingestion/source/sql_source_common.py +++ b/ingestion/src/metadata/ingestion/source/sql_source_common.py @@ -34,7 +34,7 @@ class SQLSourceStatus(SourceStatus): def filter(self, record: str, err: str) -> None: self.filtered.append(record) - logger.warning(f"Dropped Table {record} due to {err}") + logger.warning(f"Filtered Table {record} due to {err}") def build_sql_source_connection_url(