Fix #4243: Fixed Amundsen Ingestion (#5319)

This commit is contained in:
Mayur Singal 2022-06-07 10:53:34 +05:30 committed by GitHub
parent a2e25d7cc3
commit fe757624bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -17,8 +17,9 @@ from typing import Iterable, List, Optional
from pydantic import SecretStr
from metadata.config.common import ConfigModel
from metadata.generated.schema.api.services.createDatabaseService import (
CreateDatabaseServiceRequest,
from metadata.generated.schema.api.tags.createTag import CreateTagRequest
from metadata.generated.schema.api.tags.createTagCategory import (
CreateTagCategoryRequest,
)
from metadata.generated.schema.api.teams.createTeam import CreateTeamRequest
from metadata.generated.schema.api.teams.createUser import CreateUserRequest
@ -38,21 +39,25 @@ from metadata.generated.schema.entity.services.databaseService import (
DatabaseService,
DatabaseServiceType,
)
from metadata.generated.schema.entity.tags.tagCategory import Tag
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.tagLabel import TagLabel
from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.models.ometa_tag_category import OMetaTagAndCategory
from metadata.ingestion.models.table_metadata import Chart, Dashboard
from metadata.ingestion.models.user import OMetaUserProfile
from metadata.ingestion.ometa.client import APIError
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.neo4j_helper import Neo4JConfig, Neo4jHelper
from metadata.utils import fqn
from metadata.utils.column_type_parser import ColumnTypeParser
from metadata.utils.helpers import get_dashboard_service_or_create
from metadata.utils.logger import ingestion_logger
from metadata.utils.neo4j_helper import Neo4JConfig, Neo4jHelper
from metadata.utils.sql_queries import (
NEO4J_AMUNDSEN_DASHBOARD_QUERY,
NEO4J_AMUNDSEN_TABLE_QUERY,
@ -72,6 +77,8 @@ class AmundsenConfig(ConfigModel):
PRIMITIVE_TYPES = ["int", "char", "varchar"]
AMUNDSEN_TAG_CATEGORY = "AmundsenTags"
AMUNDSEN_TABLE_TAG = "amundsen_table"
@dataclass
@ -93,8 +100,8 @@ class AmundsenSource(Source[Entity]):
def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection):
self.config = config
self.metadata_config = metadata_config
self.metadata = OpenMetadata(self.metadata_config)
self.service_connection = config.serviceConnection.__root__.config
self.metadata = OpenMetadata(self.metadata_config)
neo4j_config = Neo4JConfig(
username=self.service_connection.username,
@ -156,15 +163,30 @@ class AmundsenSource(Source[Entity]):
except Exception as err:
logger.error(err)
def create_tags(self, tags):
for tag in tags:
tag_category = OMetaTagAndCategory(
category_name=CreateTagCategoryRequest(
name=AMUNDSEN_TAG_CATEGORY,
description="Tags associates with amundsen entities",
categoryType="Descriptive",
),
category_details=CreateTagRequest(
name=tag, description="Amundsen Table Tag"
),
)
yield tag_category
logger.info(f"Tag Category {tag_category}, Primary Tag {tag} Ingested")
def create_table_entity(self, table):
try:
service_name = table["cluster"]
service_type = table["database"]
service_name = table["database"]
service_type = self.database_service_map.get(
service_name.lower(), DatabaseServiceType.Mysql.value
)
# TODO: use metadata.get_service_or_create
service_entity = self.get_database_service_or_create(
service_name, service_type
)
service_entity = self.get_database_service(service_name)
database = Database(
id=uuid.uuid4(),
name="default",
@ -188,23 +210,79 @@ class AmundsenSource(Source[Entity]):
parsed_string = ColumnTypeParser._parse_datatype_string(data_type)
parsed_string["name"] = name
parsed_string["dataLength"] = 1
parsed_string["description"] = description
col = Column(**parsed_string)
columns.append(col)
table_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name=service_name,
database_name=database.name.__root__,
schema_name=database_schema.name.__root__,
table_name=table["name"],
amundsen_table_tag = OMetaTagAndCategory(
category_name=CreateTagCategoryRequest(
name=AMUNDSEN_TAG_CATEGORY,
description="Tags associates with amundsen entities",
categoryType="Descriptive",
),
category_details=CreateTagRequest(
name=AMUNDSEN_TABLE_TAG, description="Amundsen Table Tag"
),
)
yield amundsen_table_tag
amundsen_cluster_tag = OMetaTagAndCategory(
category_name=CreateTagCategoryRequest(
name=AMUNDSEN_TAG_CATEGORY,
description="Tags associates with amundsen entities",
categoryType="Descriptive",
),
category_details=CreateTagRequest(
name=table["cluster"], description="Amundsen Cluster Tag"
),
)
yield amundsen_cluster_tag
tags = [
TagLabel(
tagFQN=fqn.build(
self.metadata,
Tag,
tag_category_name=AMUNDSEN_TAG_CATEGORY,
tag_name=AMUNDSEN_TABLE_TAG,
),
labelType="Automated",
state="Suggested",
source="Tag",
),
TagLabel(
tagFQN=fqn.build(
self.metadata,
Tag,
tag_category_name=AMUNDSEN_TAG_CATEGORY,
tag_name=table["cluster"],
),
labelType="Automated",
state="Suggested",
source="Tag",
),
]
if table["tags"]:
yield from self.create_tags(table["tags"])
tags.extend(
[
TagLabel(
tagFQN=fqn.build(
self.metadata,
Tag,
tag_category_name=AMUNDSEN_TAG_CATEGORY,
tag_name=tag,
),
labelType="Automated",
state="Suggested",
source="Tag",
)
for tag in table["tags"]
]
)
table_entity = Table(
id=uuid.uuid4(),
name=table["name"],
tableType="Regular",
description=table["description"],
fullyQualifiedName=table_fqn,
tags=tags,
columns=columns,
)
@ -225,7 +303,7 @@ class AmundsenSource(Source[Entity]):
service_entity = get_dashboard_service_or_create(
service_name,
DashboardServiceType.Superset.name,
{},
{"username": "test", "hostPort": "http://localhost:8088"},
self.metadata_config,
)
self.status.scanned(dashboard["name"])
@ -248,7 +326,7 @@ class AmundsenSource(Source[Entity]):
service_entity = get_dashboard_service_or_create(
service_name,
DashboardServiceType.Superset.name,
{},
{"username": "test", "hostPort": "http://localhost:8088"},
self.metadata_config,
)
@ -282,25 +360,12 @@ class AmundsenSource(Source[Entity]):
return p_type
return data_type
def get_database_service_or_create(
self, service_name: str, service_type: str
) -> DatabaseService:
def get_database_service(self, service_name: str) -> DatabaseService:
service = self.metadata.get_by_name(entity=DatabaseService, fqn=service_name)
if service is not None:
return service
else:
service = {
"name": service_name,
"description": "",
"serviceType": self.database_service_map.get(
service_type.lower(), DatabaseServiceType.Mysql.value
),
"connection": {"config": {}},
}
created_service = self.metadata.create_or_update(
CreateDatabaseServiceRequest(**service)
)
return created_service
logger.error(f"Please create a service with name {service_name}")
def test_connection(self) -> None:
pass