From 13f6f79f309b1957bdcfe7dcc301f40eca115e53 Mon Sep 17 00:00:00 2001 From: Mayur Singal <39544459+ulixius9@users.noreply.github.com> Date: Mon, 10 Oct 2022 10:16:59 +0530 Subject: [PATCH] Metadata Source Lint (#8018) --- .../ingestion/source/metadata/amundsen.py | 86 +++++++++++------ .../ingestion/source/metadata/atlas.py | 96 +++++++++++-------- .../ingestion/source/metadata/metadata.py | 11 ++- .../source/metadata/metadata_elasticsearch.py | 4 + .../ingestion/source/metadata/migrate.py | 14 +-- .../ingestion/source/metadata/openmetadata.py | 8 +- 6 files changed, 133 insertions(+), 86 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/metadata/amundsen.py b/ingestion/src/metadata/ingestion/source/metadata/amundsen.py index 3fef88d7a4f..64cdf2f7100 100644 --- a/ingestion/src/metadata/ingestion/source/metadata/amundsen.py +++ b/ingestion/src/metadata/ingestion/source/metadata/amundsen.py @@ -9,12 +9,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +""" +Amundsen source to extract metadata +""" + import traceback from typing import Iterable, List, Optional from pydantic import SecretStr from sqlalchemy.engine.url import make_url -from tomlkit import table from metadata.clients.neo4j_client import Neo4JConfig, Neo4jHelper from metadata.config.common import ConfigModel @@ -100,25 +103,31 @@ SUPERSET_DEFAULT_CONFIG = { class AmundsenStatus(SourceStatus): - success: List[str] = list() - failures: List[str] = list() - warnings: List[str] = list() - filtered: List[str] = list() + success: List[str] = [] + failures: List[str] = [] + warnings: List[str] = [] + filtered: List[str] = [] - def scanned(self, entity_name: str) -> None: - self.success.append(entity_name) - logger.info("Entity Scanned: {}".format(entity_name)) + def scanned(self, record: str) -> None: + self.success.append(record) + logger.info(f"Entity Scanned: {record}") def failure(self, key: str, reason: str) -> None: self.failures.append({key: reason}) class AmundsenSource(Source[Entity]): + """ + Amundsen source class + """ + dashboard_service: DashboardService def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection): self.config = config self.metadata_config = metadata_config + self.database_schema_object = None + self.database_object = None self.metadata = OpenMetadata(self.metadata_config) self.service_connection = self.config.serviceConnection.__root__.config neo4j_config = Neo4JConfig( @@ -207,13 +216,14 @@ class AmundsenSource(Source[Entity]): entity=DatabaseService, fqn=service_url.get_backend_name() ) if service_entity: - table_fqn = "{service}.{database_schema}.{table}".format( - service=service_url.get_backend_name(), - database_schema=service_url.host + service = service_url.get_backend_name() + database_schema = ( + service_url.host if hasattr(service_entity.connection.config, "supportsDatabase") - else f"default.{service_url.host.split('.')[-1]}", - table=service_url.database, + else f"default.{service_url.host.split('.')[-1]}" ) + table = service_url.database + table_fqn = f"{service}.{database_schema}.{table}" table_entity: Table = self.metadata.get_by_name( entity=Table, fqn=table_fqn ) @@ -245,11 +255,9 @@ class AmundsenSource(Source[Entity]): yield tag_category logger.info(f"Tag Category {tag_category}, Primary Tag {tag} Ingested") - def create_table_entity(self, table): + def _yield_create_database(self, table): try: - service_name = table["database"] - # TODO: use metadata.get_service_or_create - service_entity = self.get_database_service(service_name) + service_entity = self.get_database_service(table["database"]) database_request = CreateDatabaseRequest( name=table["cluster"] @@ -263,32 +271,47 @@ class AmundsenSource(Source[Entity]): database_fqn = fqn.build( self.metadata, entity_type=Database, - service_name=service_name, - database_name=database_request.name.__root__, + service_name=table["database"], + database_name=table["cluster"], ) - database_object = self.metadata.get_by_name( + self.database_object = self.metadata.get_by_name( entity=Database, fqn=database_fqn ) + except Exception as err: + logger.error(f"Failed to Ingest database due to - {err}") + logger.debug(traceback.format_exc()) + def _yield_create_database_schema(self, table): + try: database_schema_request = CreateDatabaseSchemaRequest( name=table["schema"], - database=EntityReference(id=database_object.id, type="database"), + database=EntityReference(id=self.database_object.id, type="database"), ) yield database_schema_request database_schema_fqn = fqn.build( self.metadata, entity_type=DatabaseSchema, - service_name=service_name, - database_name=database_request.name.__root__, + service_name=table["database"], + database_name=table["cluster"], schema_name=database_schema_request.name.__root__, ) - database_schema_object = self.metadata.get_by_name( + self.database_schema_object = self.metadata.get_by_name( entity=DatabaseSchema, fqn=database_schema_fqn ) + except Exception as err: + logger.error(f"Failed to Ingest database due to - {err}") + logger.debug(traceback.format_exc()) + def create_table_entity(self, table): + """ + Process table details and return CreateTableRequest + """ + try: + yield from self._yield_create_database(table) + yield from self._yield_create_database_schema(table) columns: List[Column] = [] if len(table["column_names"]) == len(table["column_descriptions"]): # zipping on column_descriptions can cause incorrect or no ingestion @@ -308,7 +331,9 @@ class AmundsenSource(Source[Entity]): # Amundsen merges the length into type itself. Instead of making changes to our generic type builder # we will do a type match and see if it matches any primitive types and return a type data_type = self.get_type_primitive_type(data_type) - parsed_string = ColumnTypeParser._parse_datatype_string(data_type) + parsed_string = ColumnTypeParser._parse_datatype_string( # pylint: disable=protected-access + data_type + ) # pylint: disable=protected-access parsed_string["name"] = name parsed_string["dataLength"] = 1 parsed_string["description"] = description @@ -383,7 +408,7 @@ class AmundsenSource(Source[Entity]): tableType="Regular", description=table["description"], databaseSchema=EntityReference( - id=database_schema_object.id, type="databaseSchema" + id=self.database_schema_object.id, type="databaseSchema" ), tags=tags, columns=columns, @@ -396,7 +421,6 @@ class AmundsenSource(Source[Entity]): logger.debug(traceback.format_exc()) logger.warning(f"Failed to create table entity [{table}]: {exc}") self.status.failure(table["name"], str(exc)) - return None def create_dashboard_service(self, dashboard: dict): service_name = dashboard["cluster"] @@ -412,6 +436,9 @@ class AmundsenSource(Source[Entity]): ) def create_dashboard_entity(self, dashboard): + """ + Method to process dashboard and return CreateDashboardRequest + """ try: self.status.scanned(dashboard["name"]) yield CreateDashboardRequest( @@ -432,7 +459,6 @@ class AmundsenSource(Source[Entity]): logger.debug(traceback.format_exc()) logger.warning(f"Failed to create dashboard entity [{dashboard}]: {exc}") self.status.failure(dashboard["name"], str(exc)) - return None def create_chart_entity(self, dashboard): for (name, chart_id, chart_type, url) in zip( @@ -471,8 +497,8 @@ class AmundsenSource(Source[Entity]): service = self.metadata.get_by_name(entity=DatabaseService, fqn=service_name) if service is not None: return service - else: - logger.error(f"Please create a service with name {service_name}") + logger.error(f"Please create a service with name {service_name}") + return None def test_connection(self) -> None: pass diff --git a/ingestion/src/metadata/ingestion/source/metadata/atlas.py b/ingestion/src/metadata/ingestion/source/metadata/atlas.py index cd641630b52..a795cbc8060 100644 --- a/ingestion/src/metadata/ingestion/source/metadata/atlas.py +++ b/ingestion/src/metadata/ingestion/source/metadata/atlas.py @@ -1,3 +1,18 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Atlas source to extract metadata +""" + import traceback from dataclasses import dataclass from pathlib import Path @@ -40,8 +55,8 @@ logger = ingestion_logger() class AtlasSourceStatus(SourceStatus): - tables_scanned: List[str] = list() - filtered: List[str] = list() + tables_scanned: List[str] = [] + filtered: List[str] = [] def table_scanned(self, table: str) -> None: self.tables_scanned.append(table) @@ -52,6 +67,10 @@ class AtlasSourceStatus(SourceStatus): @dataclass class AtlasSource(Source): + """ + Atlas source class + """ + config: WorkflowSource atlas_client: AtlasClient status: AtlasSourceStatus @@ -63,7 +82,6 @@ class AtlasSource(Source): config: WorkflowSource, metadata_config: OpenMetadataConnection, ): - super().__init__() self.config = config self.metadata_config = metadata_config self.metadata = OpenMetadata(metadata_config) @@ -78,11 +96,18 @@ class AtlasSource(Source): if not path.is_file(): logger.error(f"File not found {self.service_connection.entityTypes}") raise FileNotFoundError() - with open(self.service_connection.entityTypes, "r") as f: - self.service_connection.entityTypes = yaml.load(f, Loader=yaml.SafeLoader) + with open( + self.service_connection.entityTypes, "r", encoding="utf-8" + ) as entity_types_file: + self.service_connection.entityTypes = yaml.load( + entity_types_file, Loader=yaml.SafeLoader + ) self.tables: Dict[str, Any] = {} self.topics: Dict[str, Any] = {} + self.service = None + self.message_service = None + @classmethod def create(cls, config_dict, metadata_config: OpenMetadataConnection): @@ -98,7 +123,6 @@ class AtlasSource(Source): """ Not required to implement """ - pass def next_record(self): for key in self.service_connection.entityTypes["Table"].keys(): @@ -121,13 +145,15 @@ class AtlasSource(Source): yield from self._parse_topic_entity(topic) def close(self): - return super().close() + """ + Not required to implement + """ def get_status(self) -> SourceStatus: return self.status def _parse_topic_entity(self, name): - for key in self.topics.keys(): + for key in self.topics: topic_entity = self.atlas_client.get_entity(self.topics[key]) tpc_entities = topic_entity["entities"] for tpc_entity in tpc_entities: @@ -164,55 +190,45 @@ class AtlasSource(Source): db_entity = tbl_entity["relationshipAttributes"][ self.service_connection.entityTypes["Table"][name]["db"] ] - database_request = self.get_database_entity( - db_entity["displayText"] - ) - table_name = tbl_attrs["name"] - tbl_description = tbl_attrs["description"] - yield database_request + yield self.get_database_entity(db_entity["displayText"]) database_fqn = fqn.build( self.metadata, entity_type=Database, service_name=self.service_connection.dbService, - database_name=database_request.name.__root__, + database_name=db_entity["displayText"], ) database_object = self.metadata.get_by_name( entity=Database, fqn=database_fqn ) - tbl_attrs = tbl_entity["attributes"] - db_entity = tbl_entity["relationshipAttributes"]["db"] - - database_schema_request = CreateDatabaseSchemaRequest( + yield CreateDatabaseSchemaRequest( name=db_entity["displayText"], database=EntityReference( id=database_object.id, type="database" ), ) - yield database_schema_request database_schema_fqn = fqn.build( self.metadata, entity_type=DatabaseSchema, service_name=self.config.serviceConnection.__root__.config.dbService, - database_name=database_request.name.__root__, - schema_name=database_schema_request.name.__root__, + database_name=db_entity["displayText"], + schema_name=db_entity["displayText"], ) database_schema_object = self.metadata.get_by_name( entity=DatabaseSchema, fqn=database_schema_fqn ) - table_request = CreateTableRequest( - name=table_name, + yield CreateTableRequest( + name=tbl_attrs["name"], databaseSchema=EntityReference( id=database_schema_object.id, type="databaseSchema" ), - description=tbl_description, + description=tbl_attrs["description"], columns=tbl_columns, ) - yield table_request yield from self.ingest_lineage(tbl_entity["guid"], name) @@ -239,7 +255,7 @@ class AtlasSource(Source): dataType=ColumnTypeParser.get_column_type( column["dataType"].upper() ), - dataTypeDisplay="{}({})".format(column["dataType"], "1"), + dataTypeDisplay=column["dataType"], dataLength=col_data_length, ordinalPosition=ordinal_pos, ) @@ -257,6 +273,9 @@ class AtlasSource(Source): ) def ingest_lineage(self, source_guid, name) -> Iterable[AddLineageRequest]: + """ + Fetch and ingest lineage + """ lineage_response = self.atlas_client.get_lineage(source_guid) lineage_relations = lineage_response["relations"] tbl_entity = self.atlas_client.get_entity(lineage_response["baseEntityGuid"]) @@ -321,18 +340,19 @@ class AtlasSource(Source): ) yield lineage - def get_lineage_entity_ref(self, fqn, metadata_config, type) -> EntityReference: + def get_lineage_entity_ref( + self, to_fqn, metadata_config, entity_type + ) -> EntityReference: metadata = OpenMetadata(metadata_config) - if type == "table": - table = metadata.get_by_name(entity=Table, fqn=fqn) - if not table: - return - return EntityReference(id=table.id.__root__, type="table") - elif type == "pipeline": - pipeline = metadata.get_by_name(entity=Pipeline, fqn=fqn) - if not pipeline: - return - return EntityReference(id=pipeline.id.__root__, type="pipeline") + if entity_type == "table": + table = metadata.get_by_name(entity=Table, fqn=to_fqn) + if table: + return EntityReference(id=table.id.__root__, type="table") + if entity_type == "pipeline": + pipeline = metadata.get_by_name(entity=Pipeline, fqn=to_fqn) + if pipeline: + return EntityReference(id=pipeline.id.__root__, type="pipeline") + return None def test_connection(self) -> None: pass diff --git a/ingestion/src/metadata/ingestion/source/metadata/metadata.py b/ingestion/src/metadata/ingestion/source/metadata/metadata.py index 1c662eab1df..78b36ff2cab 100644 --- a/ingestion/src/metadata/ingestion/source/metadata/metadata.py +++ b/ingestion/src/metadata/ingestion/source/metadata/metadata.py @@ -42,9 +42,9 @@ logger = ingestion_logger() class MetadataSourceStatus(SourceStatus): - success: List[str] = list() - failures: List[str] = list() - warnings: List[str] = list() + success: List[str] = [] + failures: List[str] = [] + warnings: List[str] = [] def scanned_entity(self, entity_class_name: str, entity_name: str) -> None: self.success.append(entity_name) @@ -59,6 +59,9 @@ class MetadataSourceStatus(SourceStatus): class MetadataSource(Source[Entity]): + """ + Metadata Source to Fetch All Entities from backend + """ config: WorkflowSource report: SourceStatus @@ -85,7 +88,7 @@ class MetadataSource(Source[Entity]): def create(cls, config_dict, metadata_config: OpenMetadataConnection): raise NotImplementedError("Create Method not implemented") - def next_record(self) -> Iterable[Entity]: + def next_record(self) -> Iterable[Entity]: # pylint: disable=too-many-branches if self.service_connection.includeTables: yield from self.fetch_entities( entity_class=Table, diff --git a/ingestion/src/metadata/ingestion/source/metadata/metadata_elasticsearch.py b/ingestion/src/metadata/ingestion/source/metadata/metadata_elasticsearch.py index f9ebecdead0..2bb49f6d45c 100644 --- a/ingestion/src/metadata/ingestion/source/metadata/metadata_elasticsearch.py +++ b/ingestion/src/metadata/ingestion/source/metadata/metadata_elasticsearch.py @@ -27,6 +27,10 @@ logger = ingestion_logger() class MetadataElasticsearchSource(MetadataSource): + """ + Metadata Elastic Search Source + Used for metadata to ES pipeline + """ config: WorkflowSource report: SourceStatus diff --git a/ingestion/src/metadata/ingestion/source/metadata/migrate.py b/ingestion/src/metadata/ingestion/source/metadata/migrate.py index 1e4128167de..3fcf59c5c0d 100644 --- a/ingestion/src/metadata/ingestion/source/metadata/migrate.py +++ b/ingestion/src/metadata/ingestion/source/metadata/migrate.py @@ -70,17 +70,14 @@ class DatabaseServiceWrapper: class MigrateSource(MetadataSource): + """ + Metadata Migrate source module + to migrate from 0.9 from 0.10 + """ config: WorkflowSource report: SourceStatus - def __init__( - self, - config: WorkflowSource, - metadata_config: OpenMetadataConnection, - ): - super().__init__(config, metadata_config) - @classmethod def create(cls, config_dict, metadata_config: OpenMetadataConnection): config: WorkflowSource = WorkflowSource.parse_obj(config_dict) @@ -92,6 +89,9 @@ class MigrateSource(MetadataSource): return cls(config, metadata_config) def next_record(self) -> Iterable[Entity]: + """ + Fetch all relebvent entities + """ if self.service_connection.includeTables: yield from self.fetch_tables( fields=[ diff --git a/ingestion/src/metadata/ingestion/source/metadata/openmetadata.py b/ingestion/src/metadata/ingestion/source/metadata/openmetadata.py index 5e710a75307..56dc9f07beb 100644 --- a/ingestion/src/metadata/ingestion/source/metadata/openmetadata.py +++ b/ingestion/src/metadata/ingestion/source/metadata/openmetadata.py @@ -25,17 +25,11 @@ logger = ingestion_logger() class OpenmetadataSource(MetadataSource): + """Metadata source Class""" config: WorkflowSource report: SourceStatus - def __init__( - self, - config: WorkflowSource, - metadata_config: OpenMetadataConnection, - ): - super().__init__(config, metadata_config) - @classmethod def create(cls, config_dict, metadata_config: OpenMetadataConnection): config: WorkflowSource = WorkflowSource.parse_obj(config_dict)