Metadata Source Lint (#8018)

This commit is contained in:
Mayur Singal 2022-10-10 10:16:59 +05:30 committed by GitHub
parent b8e989af6c
commit 13f6f79f30
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 133 additions and 86 deletions

View File

@ -9,12 +9,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Amundsen source to extract metadata
"""
import traceback
from typing import Iterable, List, Optional
from pydantic import SecretStr
from sqlalchemy.engine.url import make_url
from tomlkit import table
from metadata.clients.neo4j_client import Neo4JConfig, Neo4jHelper
from metadata.config.common import ConfigModel
@ -100,25 +103,31 @@ SUPERSET_DEFAULT_CONFIG = {
class AmundsenStatus(SourceStatus):
success: List[str] = list()
failures: List[str] = list()
warnings: List[str] = list()
filtered: List[str] = list()
success: List[str] = []
failures: List[str] = []
warnings: List[str] = []
filtered: List[str] = []
def scanned(self, entity_name: str) -> None:
self.success.append(entity_name)
logger.info("Entity Scanned: {}".format(entity_name))
def scanned(self, record: str) -> None:
self.success.append(record)
logger.info(f"Entity Scanned: {record}")
def failure(self, key: str, reason: str) -> None:
self.failures.append({key: reason})
class AmundsenSource(Source[Entity]):
"""
Amundsen source class
"""
dashboard_service: DashboardService
def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection):
self.config = config
self.metadata_config = metadata_config
self.database_schema_object = None
self.database_object = None
self.metadata = OpenMetadata(self.metadata_config)
self.service_connection = self.config.serviceConnection.__root__.config
neo4j_config = Neo4JConfig(
@ -207,13 +216,14 @@ class AmundsenSource(Source[Entity]):
entity=DatabaseService, fqn=service_url.get_backend_name()
)
if service_entity:
table_fqn = "{service}.{database_schema}.{table}".format(
service=service_url.get_backend_name(),
database_schema=service_url.host
service = service_url.get_backend_name()
database_schema = (
service_url.host
if hasattr(service_entity.connection.config, "supportsDatabase")
else f"default.{service_url.host.split('.')[-1]}",
table=service_url.database,
else f"default.{service_url.host.split('.')[-1]}"
)
table = service_url.database
table_fqn = f"{service}.{database_schema}.{table}"
table_entity: Table = self.metadata.get_by_name(
entity=Table, fqn=table_fqn
)
@ -245,11 +255,9 @@ class AmundsenSource(Source[Entity]):
yield tag_category
logger.info(f"Tag Category {tag_category}, Primary Tag {tag} Ingested")
def create_table_entity(self, table):
def _yield_create_database(self, table):
try:
service_name = table["database"]
# TODO: use metadata.get_service_or_create
service_entity = self.get_database_service(service_name)
service_entity = self.get_database_service(table["database"])
database_request = CreateDatabaseRequest(
name=table["cluster"]
@ -263,32 +271,47 @@ class AmundsenSource(Source[Entity]):
database_fqn = fqn.build(
self.metadata,
entity_type=Database,
service_name=service_name,
database_name=database_request.name.__root__,
service_name=table["database"],
database_name=table["cluster"],
)
database_object = self.metadata.get_by_name(
self.database_object = self.metadata.get_by_name(
entity=Database, fqn=database_fqn
)
except Exception as err:
logger.error(f"Failed to Ingest database due to - {err}")
logger.debug(traceback.format_exc())
def _yield_create_database_schema(self, table):
try:
database_schema_request = CreateDatabaseSchemaRequest(
name=table["schema"],
database=EntityReference(id=database_object.id, type="database"),
database=EntityReference(id=self.database_object.id, type="database"),
)
yield database_schema_request
database_schema_fqn = fqn.build(
self.metadata,
entity_type=DatabaseSchema,
service_name=service_name,
database_name=database_request.name.__root__,
service_name=table["database"],
database_name=table["cluster"],
schema_name=database_schema_request.name.__root__,
)
database_schema_object = self.metadata.get_by_name(
self.database_schema_object = self.metadata.get_by_name(
entity=DatabaseSchema, fqn=database_schema_fqn
)
except Exception as err:
logger.error(f"Failed to Ingest database due to - {err}")
logger.debug(traceback.format_exc())
def create_table_entity(self, table):
"""
Process table details and return CreateTableRequest
"""
try:
yield from self._yield_create_database(table)
yield from self._yield_create_database_schema(table)
columns: List[Column] = []
if len(table["column_names"]) == len(table["column_descriptions"]):
# zipping on column_descriptions can cause incorrect or no ingestion
@ -308,7 +331,9 @@ class AmundsenSource(Source[Entity]):
# Amundsen merges the length into type itself. Instead of making changes to our generic type builder
# we will do a type match and see if it matches any primitive types and return a type
data_type = self.get_type_primitive_type(data_type)
parsed_string = ColumnTypeParser._parse_datatype_string(data_type)
parsed_string = ColumnTypeParser._parse_datatype_string( # pylint: disable=protected-access
data_type
) # pylint: disable=protected-access
parsed_string["name"] = name
parsed_string["dataLength"] = 1
parsed_string["description"] = description
@ -383,7 +408,7 @@ class AmundsenSource(Source[Entity]):
tableType="Regular",
description=table["description"],
databaseSchema=EntityReference(
id=database_schema_object.id, type="databaseSchema"
id=self.database_schema_object.id, type="databaseSchema"
),
tags=tags,
columns=columns,
@ -396,7 +421,6 @@ class AmundsenSource(Source[Entity]):
logger.debug(traceback.format_exc())
logger.warning(f"Failed to create table entity [{table}]: {exc}")
self.status.failure(table["name"], str(exc))
return None
def create_dashboard_service(self, dashboard: dict):
service_name = dashboard["cluster"]
@ -412,6 +436,9 @@ class AmundsenSource(Source[Entity]):
)
def create_dashboard_entity(self, dashboard):
"""
Method to process dashboard and return CreateDashboardRequest
"""
try:
self.status.scanned(dashboard["name"])
yield CreateDashboardRequest(
@ -432,7 +459,6 @@ class AmundsenSource(Source[Entity]):
logger.debug(traceback.format_exc())
logger.warning(f"Failed to create dashboard entity [{dashboard}]: {exc}")
self.status.failure(dashboard["name"], str(exc))
return None
def create_chart_entity(self, dashboard):
for (name, chart_id, chart_type, url) in zip(
@ -471,8 +497,8 @@ class AmundsenSource(Source[Entity]):
service = self.metadata.get_by_name(entity=DatabaseService, fqn=service_name)
if service is not None:
return service
else:
logger.error(f"Please create a service with name {service_name}")
logger.error(f"Please create a service with name {service_name}")
return None
def test_connection(self) -> None:
pass

View File

@ -1,3 +1,18 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Atlas source to extract metadata
"""
import traceback
from dataclasses import dataclass
from pathlib import Path
@ -40,8 +55,8 @@ logger = ingestion_logger()
class AtlasSourceStatus(SourceStatus):
tables_scanned: List[str] = list()
filtered: List[str] = list()
tables_scanned: List[str] = []
filtered: List[str] = []
def table_scanned(self, table: str) -> None:
self.tables_scanned.append(table)
@ -52,6 +67,10 @@ class AtlasSourceStatus(SourceStatus):
@dataclass
class AtlasSource(Source):
"""
Atlas source class
"""
config: WorkflowSource
atlas_client: AtlasClient
status: AtlasSourceStatus
@ -63,7 +82,6 @@ class AtlasSource(Source):
config: WorkflowSource,
metadata_config: OpenMetadataConnection,
):
super().__init__()
self.config = config
self.metadata_config = metadata_config
self.metadata = OpenMetadata(metadata_config)
@ -78,11 +96,18 @@ class AtlasSource(Source):
if not path.is_file():
logger.error(f"File not found {self.service_connection.entityTypes}")
raise FileNotFoundError()
with open(self.service_connection.entityTypes, "r") as f:
self.service_connection.entityTypes = yaml.load(f, Loader=yaml.SafeLoader)
with open(
self.service_connection.entityTypes, "r", encoding="utf-8"
) as entity_types_file:
self.service_connection.entityTypes = yaml.load(
entity_types_file, Loader=yaml.SafeLoader
)
self.tables: Dict[str, Any] = {}
self.topics: Dict[str, Any] = {}
self.service = None
self.message_service = None
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
@ -98,7 +123,6 @@ class AtlasSource(Source):
"""
Not required to implement
"""
pass
def next_record(self):
for key in self.service_connection.entityTypes["Table"].keys():
@ -121,13 +145,15 @@ class AtlasSource(Source):
yield from self._parse_topic_entity(topic)
def close(self):
return super().close()
"""
Not required to implement
"""
def get_status(self) -> SourceStatus:
return self.status
def _parse_topic_entity(self, name):
for key in self.topics.keys():
for key in self.topics:
topic_entity = self.atlas_client.get_entity(self.topics[key])
tpc_entities = topic_entity["entities"]
for tpc_entity in tpc_entities:
@ -164,55 +190,45 @@ class AtlasSource(Source):
db_entity = tbl_entity["relationshipAttributes"][
self.service_connection.entityTypes["Table"][name]["db"]
]
database_request = self.get_database_entity(
db_entity["displayText"]
)
table_name = tbl_attrs["name"]
tbl_description = tbl_attrs["description"]
yield database_request
yield self.get_database_entity(db_entity["displayText"])
database_fqn = fqn.build(
self.metadata,
entity_type=Database,
service_name=self.service_connection.dbService,
database_name=database_request.name.__root__,
database_name=db_entity["displayText"],
)
database_object = self.metadata.get_by_name(
entity=Database, fqn=database_fqn
)
tbl_attrs = tbl_entity["attributes"]
db_entity = tbl_entity["relationshipAttributes"]["db"]
database_schema_request = CreateDatabaseSchemaRequest(
yield CreateDatabaseSchemaRequest(
name=db_entity["displayText"],
database=EntityReference(
id=database_object.id, type="database"
),
)
yield database_schema_request
database_schema_fqn = fqn.build(
self.metadata,
entity_type=DatabaseSchema,
service_name=self.config.serviceConnection.__root__.config.dbService,
database_name=database_request.name.__root__,
schema_name=database_schema_request.name.__root__,
database_name=db_entity["displayText"],
schema_name=db_entity["displayText"],
)
database_schema_object = self.metadata.get_by_name(
entity=DatabaseSchema, fqn=database_schema_fqn
)
table_request = CreateTableRequest(
name=table_name,
yield CreateTableRequest(
name=tbl_attrs["name"],
databaseSchema=EntityReference(
id=database_schema_object.id, type="databaseSchema"
),
description=tbl_description,
description=tbl_attrs["description"],
columns=tbl_columns,
)
yield table_request
yield from self.ingest_lineage(tbl_entity["guid"], name)
@ -239,7 +255,7 @@ class AtlasSource(Source):
dataType=ColumnTypeParser.get_column_type(
column["dataType"].upper()
),
dataTypeDisplay="{}({})".format(column["dataType"], "1"),
dataTypeDisplay=column["dataType"],
dataLength=col_data_length,
ordinalPosition=ordinal_pos,
)
@ -257,6 +273,9 @@ class AtlasSource(Source):
)
def ingest_lineage(self, source_guid, name) -> Iterable[AddLineageRequest]:
"""
Fetch and ingest lineage
"""
lineage_response = self.atlas_client.get_lineage(source_guid)
lineage_relations = lineage_response["relations"]
tbl_entity = self.atlas_client.get_entity(lineage_response["baseEntityGuid"])
@ -321,18 +340,19 @@ class AtlasSource(Source):
)
yield lineage
def get_lineage_entity_ref(self, fqn, metadata_config, type) -> EntityReference:
def get_lineage_entity_ref(
self, to_fqn, metadata_config, entity_type
) -> EntityReference:
metadata = OpenMetadata(metadata_config)
if type == "table":
table = metadata.get_by_name(entity=Table, fqn=fqn)
if not table:
return
return EntityReference(id=table.id.__root__, type="table")
elif type == "pipeline":
pipeline = metadata.get_by_name(entity=Pipeline, fqn=fqn)
if not pipeline:
return
return EntityReference(id=pipeline.id.__root__, type="pipeline")
if entity_type == "table":
table = metadata.get_by_name(entity=Table, fqn=to_fqn)
if table:
return EntityReference(id=table.id.__root__, type="table")
if entity_type == "pipeline":
pipeline = metadata.get_by_name(entity=Pipeline, fqn=to_fqn)
if pipeline:
return EntityReference(id=pipeline.id.__root__, type="pipeline")
return None
def test_connection(self) -> None:
pass

View File

@ -42,9 +42,9 @@ logger = ingestion_logger()
class MetadataSourceStatus(SourceStatus):
success: List[str] = list()
failures: List[str] = list()
warnings: List[str] = list()
success: List[str] = []
failures: List[str] = []
warnings: List[str] = []
def scanned_entity(self, entity_class_name: str, entity_name: str) -> None:
self.success.append(entity_name)
@ -59,6 +59,9 @@ class MetadataSourceStatus(SourceStatus):
class MetadataSource(Source[Entity]):
"""
Metadata Source to Fetch All Entities from backend
"""
config: WorkflowSource
report: SourceStatus
@ -85,7 +88,7 @@ class MetadataSource(Source[Entity]):
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
raise NotImplementedError("Create Method not implemented")
def next_record(self) -> Iterable[Entity]:
def next_record(self) -> Iterable[Entity]: # pylint: disable=too-many-branches
if self.service_connection.includeTables:
yield from self.fetch_entities(
entity_class=Table,

View File

@ -27,6 +27,10 @@ logger = ingestion_logger()
class MetadataElasticsearchSource(MetadataSource):
"""
Metadata Elastic Search Source
Used for metadata to ES pipeline
"""
config: WorkflowSource
report: SourceStatus

View File

@ -70,17 +70,14 @@ class DatabaseServiceWrapper:
class MigrateSource(MetadataSource):
"""
Metadata Migrate source module
to migrate from 0.9 from 0.10
"""
config: WorkflowSource
report: SourceStatus
def __init__(
self,
config: WorkflowSource,
metadata_config: OpenMetadataConnection,
):
super().__init__(config, metadata_config)
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
@ -92,6 +89,9 @@ class MigrateSource(MetadataSource):
return cls(config, metadata_config)
def next_record(self) -> Iterable[Entity]:
"""
Fetch all relebvent entities
"""
if self.service_connection.includeTables:
yield from self.fetch_tables(
fields=[

View File

@ -25,17 +25,11 @@ logger = ingestion_logger()
class OpenmetadataSource(MetadataSource):
"""Metadata source Class"""
config: WorkflowSource
report: SourceStatus
def __init__(
self,
config: WorkflowSource,
metadata_config: OpenMetadataConnection,
):
super().__init__(config, metadata_config)
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)