Fix Amundsen migration & added logic for table entity owners (#7288)

* Fixed minor bugs with Amundsen ingestion and added logic to add table entity owners

* Updated version requirement for amundsen doc

* Fixed python style
This commit is contained in:
Teddy 2022-09-07 09:20:06 +02:00 committed by GitHub
parent b829a2cbf3
commit 45c457280b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 74 additions and 14 deletions

View File

@ -13,6 +13,8 @@ import traceback
from typing import Iterable, List, Optional from typing import Iterable, List, Optional
from pydantic import SecretStr from pydantic import SecretStr
from sqlalchemy.engine.url import make_url
from tomlkit import table
from metadata.clients.neo4j_client import Neo4JConfig, Neo4jHelper from metadata.clients.neo4j_client import Neo4JConfig, Neo4jHelper
from metadata.config.common import ConfigModel from metadata.config.common import ConfigModel
@ -45,6 +47,7 @@ from metadata.generated.schema.entity.services.databaseService import (
) )
from metadata.generated.schema.entity.tags.tagCategory import Tag from metadata.generated.schema.entity.tags.tagCategory import Tag
from metadata.generated.schema.entity.teams import team from metadata.generated.schema.entity.teams import team
from metadata.generated.schema.entity.teams.user import User
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource, Source as WorkflowSource,
) )
@ -148,14 +151,15 @@ class AmundsenSource(Source[Entity]):
pass pass
def next_record(self) -> Iterable[Entity]: def next_record(self) -> Iterable[Entity]:
user_entities = self.neo4j_helper.execute_query(NEO4J_AMUNDSEN_USER_QUERY)
for user in user_entities:
yield from self.create_user_entity(user)
table_entities = self.neo4j_helper.execute_query(NEO4J_AMUNDSEN_TABLE_QUERY) table_entities = self.neo4j_helper.execute_query(NEO4J_AMUNDSEN_TABLE_QUERY)
for table in table_entities: for table in table_entities:
yield from self.create_table_entity(table) yield from self.create_table_entity(table)
user_entities = self.neo4j_helper.execute_query(NEO4J_AMUNDSEN_USER_QUERY)
for user in user_entities:
yield from self.create_user_entity(user)
yield from self.add_owner_to_entity(user)
dashboard_entities = self.neo4j_helper.execute_query( dashboard_entities = self.neo4j_helper.execute_query(
NEO4J_AMUNDSEN_DASHBOARD_QUERY NEO4J_AMUNDSEN_DASHBOARD_QUERY
) )
@ -168,7 +172,7 @@ class AmundsenSource(Source[Entity]):
try: try:
user_metadata = CreateUserRequest( user_metadata = CreateUserRequest(
email=user["email"], email=user["email"],
name=user["full_name"], name=user["full_name"].lower().replace(" ", "_"),
displayName=f"{user['first_name']} {user['last_name']}", displayName=f"{user['first_name']} {user['last_name']}",
) )
team_metadata = CreateTeamRequest( team_metadata = CreateTeamRequest(
@ -184,6 +188,49 @@ class AmundsenSource(Source[Entity]):
logger.debug(traceback.format_exc()) logger.debug(traceback.format_exc())
logger.error(f"Failed to create user entity [{user}]: {exc}") logger.error(f"Failed to create user entity [{user}]: {exc}")
def add_owner_to_entity(self, user):
"""Add owner information to table entity
Args:
user: Amundsen user (previously added to OM)
"""
user_entity_ref = self.metadata.get_entity_reference(
entity=User, fqn=user["full_name"].lower().replace(" ", "_")
)
if not user_entity_ref:
logger.warning(f"No entity found for user {user['full_name']}")
for entity in user["entities_owned"]:
try:
service_url = make_url(entity["key"])
service_entity: DatabaseService = self.metadata.get_by_name(
entity=DatabaseService, fqn=service_url.get_backend_name()
)
if service_entity:
table_fqn = "{service}.{database_schema}.{table}".format(
service=service_url.get_backend_name(),
database_schema=service_url.host
if hasattr(service_entity.connection.config, "supportsDatabase")
else f"default.{service_url.host.split('.')[-1]}",
table=service_url.database,
)
table_entity: Table = self.metadata.get_by_name(
entity=Table, fqn=table_fqn
)
yield CreateTableRequest(
name=table_entity.name,
tableType=table_entity.tableType,
description=table_entity.description,
databaseSchema=table_entity.databaseSchema,
tags=table_entity.tags,
columns=table_entity.columns,
owner=user_entity_ref,
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(f"Failed to create user entity [{user}]: {exc}")
def create_tags(self, tags): def create_tags(self, tags):
for tag in tags: for tag in tags:
tag_category = OMetaTagAndCategory( tag_category = OMetaTagAndCategory(
@ -206,7 +253,9 @@ class AmundsenSource(Source[Entity]):
service_entity = self.get_database_service(service_name) service_entity = self.get_database_service(service_name)
database_request = CreateDatabaseRequest( database_request = CreateDatabaseRequest(
name="default", name=table["cluster"]
if hasattr(service_entity.connection.config, "supportsDatabase")
else "default",
service=EntityReference(id=service_entity.id, type="databaseService"), service=EntityReference(id=service_entity.id, type="databaseService"),
) )
@ -242,11 +291,21 @@ class AmundsenSource(Source[Entity]):
) )
columns: List[Column] = [] columns: List[Column] = []
for (name, description, data_type) in zip( if len(table["column_names"]) == len(table["column_descriptions"]):
table["column_names"], # zipping on column_descriptions can cause incorrect or no ingestion
table["column_descriptions"], # of column metadata as zip will zip on the smallest list len.
table["column_types"], columns_meta = zip(
): table["column_names"],
table["column_descriptions"],
table["column_types"],
)
else:
columns_meta = zip(
table["column_names"],
[None] * len(table["column_names"]),
table["column_types"],
)
for (name, description, data_type) in columns_meta:
# Amundsen merges the length into type itself. Instead of making changes to our generic type builder # Amundsen merges the length into type itself. Instead of making changes to our generic type builder
# we will do a type match and see if it matches any primitive types and return a type # we will do a type match and see if it matches any primitive types and return a type
data_type = self.get_type_primitive_type(data_type) data_type = self.get_type_primitive_type(data_type)

View File

@ -271,6 +271,7 @@ NEO4J_AMUNDSEN_USER_QUERY = textwrap.dedent(
user.employee_type as employee_type, manager.email as manager_email, user.employee_type as employee_type, manager.email as manager_email,
user.slack_id as slack_id, user.is_active as is_active, user.role_name as role_name, user.slack_id as slack_id, user.is_active as is_active, user.role_name as role_name,
REDUCE(sum_r = 0, r in COLLECT(DISTINCT read)| sum_r + r.read_count) AS total_read, REDUCE(sum_r = 0, r in COLLECT(DISTINCT read)| sum_r + r.read_count) AS total_read,
COLLECT(DISTINCT b) as entities_owned,
count(distinct b) as total_own, count(distinct b) as total_own,
count(distinct c) AS total_follow count(distinct c) AS total_follow
order by user.email order by user.email

View File

@ -11,7 +11,7 @@ In this page, you will learn how to use the `metadata` CLI to run a one-ingestio
<PythonMod connector="Amundsen" module="amundsen" /> <PythonMod connector="Amundsen" module="amundsen" />
Make sure you are running openmetadata-ingestion version 0.10.2 or above. Make sure you are running openmetadata-ingestion version 0.11.0 or above.
## Create Database Services ## Create Database Services
@ -83,8 +83,8 @@ source:
encrypted: <true or false> encrypted: <true or false>
modelClass: <modelclass> modelClass: <modelclass>
sourceConfig: sourceConfig:
config: config: {}
enableDataProfiler: false
sink: sink:
type: metadata-rest type: metadata-rest
config: {} config: {}