diff --git a/.gitignore b/.gitignore index 99cd96b3004..00d89d66255 100644 --- a/.gitignore +++ b/.gitignore @@ -26,6 +26,7 @@ catalog-services/catalog-services.iml # local docker volume docker/local-metadata/docker-volume +docker-volume # Java template *.class diff --git a/ingestion/src/metadata/clients/atlas_client.py b/ingestion/src/metadata/clients/atlas_client.py index 492d6cdf571..379936f1ae7 100644 --- a/ingestion/src/metadata/clients/atlas_client.py +++ b/ingestion/src/metadata/clients/atlas_client.py @@ -28,12 +28,11 @@ class AtlasClient: def __init__(self, config: AtlasConnection, raw_data: bool = False): self.config = config - config_obj = self.config.serviceConnection.__root__.config self.auth_token = generate_http_basic_token( - config_obj.username, config_obj.password.get_secret_value() + config.username, config.password.get_secret_value() ) client_config: ClientConfig = ClientConfig( - base_url=config_obj.atlasHost, + base_url=config.hostPort, auth_header="Authorization", api_version="api", auth_token=self.get_auth_token, diff --git a/ingestion/src/metadata/examples/workflows/atlas_mapping.yaml b/ingestion/src/metadata/examples/workflows/atlas_mapping.yaml deleted file mode 100644 index ab728d09bca..00000000000 --- a/ingestion/src/metadata/examples/workflows/atlas_mapping.yaml +++ /dev/null @@ -1,7 +0,0 @@ -Table: - rdbms_table: - db: rdbms_db - column: rdbms_column -Topic: -- kafka_topic -- kafka_topic_2 diff --git a/ingestion/src/metadata/ingestion/source/metadata/amundsen.py b/ingestion/src/metadata/ingestion/source/metadata/amundsen.py index a5e6e2e42b0..60ee624d336 100644 --- a/ingestion/src/metadata/ingestion/source/metadata/amundsen.py +++ b/ingestion/src/metadata/ingestion/source/metadata/amundsen.py @@ -66,10 +66,10 @@ from metadata.ingestion.ometa.client_utils import get_chart_entities_from_id from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser from metadata.utils import fqn -from metadata.utils.amundsen_helper import SERVICE_TYPE_MAPPER from metadata.utils.connections import get_connection from metadata.utils.helpers import get_standard_chart_type from metadata.utils.logger import ingestion_logger +from metadata.utils.metadata_service_helper import SERVICE_TYPE_MAPPER from metadata.utils.sql_queries import ( NEO4J_AMUNDSEN_DASHBOARD_QUERY, NEO4J_AMUNDSEN_TABLE_QUERY, diff --git a/ingestion/src/metadata/ingestion/source/metadata/atlas.py b/ingestion/src/metadata/ingestion/source/metadata/atlas.py index a795cbc8060..f057cd850dd 100644 --- a/ingestion/src/metadata/ingestion/source/metadata/atlas.py +++ b/ingestion/src/metadata/ingestion/source/metadata/atlas.py @@ -15,11 +15,8 @@ Atlas source to extract metadata import traceback from dataclasses import dataclass -from pathlib import Path from typing import Any, Dict, Iterable, List -import yaml - from metadata.clients.atlas_client import AtlasClient from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest from metadata.generated.schema.api.data.createDatabaseSchema import ( @@ -28,6 +25,16 @@ from metadata.generated.schema.api.data.createDatabaseSchema import ( from metadata.generated.schema.api.data.createTable import CreateTableRequest from metadata.generated.schema.api.data.createTopic import CreateTopicRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest +from metadata.generated.schema.api.services.createDatabaseService import ( + CreateDatabaseServiceRequest, +) +from metadata.generated.schema.api.services.createMessagingService import ( + CreateMessagingServiceRequest, +) +from metadata.generated.schema.api.tags.createTag import CreateTagRequest +from metadata.generated.schema.api.tags.createTagCategory import ( + CreateTagCategoryRequest, +) from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.pipeline import Pipeline @@ -38,21 +45,28 @@ from metadata.generated.schema.entity.services.connections.metadata.atlasConnect from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) -from metadata.generated.schema.entity.services.databaseService import DatabaseService -from metadata.generated.schema.entity.services.messagingService import MessagingService +from metadata.generated.schema.entity.tags.tagCategory import Tag from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityReference import EntityReference +from metadata.generated.schema.type.tagLabel import TagLabel from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus +from metadata.ingestion.models.ometa_tag_category import OMetaTagAndCategory from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser from metadata.utils import fqn +from metadata.utils.connections import get_connection from metadata.utils.logger import ingestion_logger +from metadata.utils.metadata_service_helper import SERVICE_TYPE_MAPPER logger = ingestion_logger() +ATLAS_TAG_CATEGORY = "AtlasMetadata" +ATLAS_TABLE_TAG = "atlas_table" +ENTITY_TYPES = {"Table": {"Table": {"db": "db", "column": "columns"}}} + class AtlasSourceStatus(SourceStatus): tables_scanned: List[str] = [] @@ -88,20 +102,7 @@ class AtlasSource(Source): self.service_connection = self.config.serviceConnection.__root__.config self.status = AtlasSourceStatus() - self.schema_registry_url = "http://localhost:8081" - self.bootstrap_servers = "http://localhost:9092" - - self.atlas_client = AtlasClient(config) - path = Path(self.service_connection.entityTypes) - if not path.is_file(): - logger.error(f"File not found {self.service_connection.entityTypes}") - raise FileNotFoundError() - with open( - self.service_connection.entityTypes, "r", encoding="utf-8" - ) as entity_types_file: - self.service_connection.entityTypes = yaml.load( - entity_types_file, Loader=yaml.SafeLoader - ) + self.atlas_client = get_connection(self.service_connection) self.tables: Dict[str, Any] = {} self.topics: Dict[str, Any] = {} @@ -125,16 +126,12 @@ class AtlasSource(Source): """ def next_record(self): - for key in self.service_connection.entityTypes["Table"].keys(): - self.service = self.metadata.get_by_name( - entity=DatabaseService, fqn=self.service_connection.dbService - ) + for key in ENTITY_TYPES["Table"]: + self.service = self.get_database_service() self.tables[key] = self.atlas_client.list_entities(entity_type=key) - for key in self.service_connection.entityTypes.get("Topic", []): - self.message_service = self.metadata.get_by_name( - entity=MessagingService, fqn=self.service_connection.messagingService - ) + for key in ENTITY_TYPES.get("Topic", []): + self.message_service = self.get_message_service() self.topics[key] = self.atlas_client.list_entities(entity_type=key) if self.tables: @@ -188,17 +185,15 @@ class AtlasSource(Source): ) tbl_attrs = tbl_entity["attributes"] db_entity = tbl_entity["relationshipAttributes"][ - self.service_connection.entityTypes["Table"][name]["db"] + ENTITY_TYPES["Table"][name]["db"] ] yield self.get_database_entity(db_entity["displayText"]) - database_fqn = fqn.build( self.metadata, entity_type=Database, - service_name=self.service_connection.dbService, + service_name=self.service.name.__root__, database_name=db_entity["displayText"], ) - database_object = self.metadata.get_by_name( entity=Database, fqn=database_fqn ) @@ -213,7 +208,7 @@ class AtlasSource(Source): database_schema_fqn = fqn.build( self.metadata, entity_type=DatabaseSchema, - service_name=self.config.serviceConnection.__root__.config.dbService, + service_name=self.service.name.__root__, database_name=db_entity["displayText"], schema_name=db_entity["displayText"], ) @@ -221,6 +216,8 @@ class AtlasSource(Source): entity=DatabaseSchema, fqn=database_schema_fqn ) + yield self.create_tag() + yield CreateTableRequest( name=tbl_attrs["name"], databaseSchema=EntityReference( @@ -228,6 +225,7 @@ class AtlasSource(Source): ), description=tbl_attrs["description"], columns=tbl_columns, + tags=self.get_tags(), ) yield from self.ingest_lineage(tbl_entity["guid"], name) @@ -236,10 +234,38 @@ class AtlasSource(Source): logger.debug(traceback.format_exc()) logger.warning(f"Failed to parse {table_entity}: {exc}") + def get_tags(self): + tags = [ + TagLabel( + tagFQN=fqn.build( + self.metadata, + Tag, + tag_category_name=ATLAS_TAG_CATEGORY, + tag_name=ATLAS_TABLE_TAG, + ), + labelType="Automated", + state="Suggested", + source="Tag", + ) + ] + return tags + + def create_tag(self) -> OMetaTagAndCategory: + atlas_table_tag = OMetaTagAndCategory( + category_name=CreateTagCategoryRequest( + name=ATLAS_TAG_CATEGORY, + description="Tags associates with atlas entities", + ), + category_details=CreateTagRequest( + name=ATLAS_TABLE_TAG, description="Atlas Cluster Tag" + ), + ) + return atlas_table_tag + def _parse_table_columns(self, table_response, tbl_entity, name) -> List[Column]: om_cols = [] col_entities = tbl_entity["relationshipAttributes"][ - self.service_connection.entityTypes["Table"][name]["column"] + ENTITY_TYPES["Table"][name]["column"] ] referred_entities = table_response["referredEntities"] ordinal_pos = 1 @@ -281,11 +307,11 @@ class AtlasSource(Source): tbl_entity = self.atlas_client.get_entity(lineage_response["baseEntityGuid"]) for key in tbl_entity["referredEntities"].keys(): if not tbl_entity["entities"][0]["relationshipAttributes"].get( - self.service_connection.entityTypes["Table"][name]["db"] + ENTITY_TYPES["Table"][name]["db"] ): continue db_entity = tbl_entity["entities"][0]["relationshipAttributes"][ - self.service_connection.entityTypes["Table"][name]["db"] + ENTITY_TYPES["Table"][name]["db"] ] if not tbl_entity["referredEntities"].get(key): continue @@ -313,7 +339,7 @@ class AtlasSource(Source): tbl_entity = self.atlas_client.get_entity(edge["toEntityId"]) for key in tbl_entity["referredEntities"]: db_entity = tbl_entity["entities"][0]["relationshipAttributes"][ - self.service_connection.entityTypes["Table"][name]["db"] + ENTITY_TYPES["Table"][name]["db"] ] db = self.get_database_entity(db_entity["displayText"]) @@ -333,6 +359,34 @@ class AtlasSource(Source): ) yield from self.yield_lineage(from_entity_ref, to_entity_ref) + def get_database_service(self): + service = self.metadata.create_or_update( + CreateDatabaseServiceRequest( + name=SERVICE_TYPE_MAPPER.get("hive")["service_name"], + displayName="hive", + serviceType=SERVICE_TYPE_MAPPER.get("hive")["service_name"], + connection=SERVICE_TYPE_MAPPER["hive"]["connection"], + ) + ) + if service is not None: + return service + logger.error("Failed to create a service with name detlaLake") + return None + + def get_message_service(self): + service = self.metadata.create_or_update( + CreateMessagingServiceRequest( + name=SERVICE_TYPE_MAPPER.get("kafka")["service_name"], + displayName=SERVICE_TYPE_MAPPER.get("kafka")["service_name"], + serviceType=SERVICE_TYPE_MAPPER.get("kafka")["service_name"], + connection=SERVICE_TYPE_MAPPER.get("kafka")["connection"], + ) + ) + if service is not None: + return service + logger.error("Failed to create a service with name kafka") + return None + def yield_lineage(self, from_entity_ref, to_entity_ref): if from_entity_ref and to_entity_ref and from_entity_ref != to_entity_ref: lineage = AddLineageRequest( diff --git a/ingestion/src/metadata/utils/connections.py b/ingestion/src/metadata/utils/connections.py index a3b17312508..b7cafa1d316 100644 --- a/ingestion/src/metadata/utils/connections.py +++ b/ingestion/src/metadata/utils/connections.py @@ -29,6 +29,7 @@ from sqlalchemy.orm import scoped_session, sessionmaker from sqlalchemy.orm.session import Session from sqlalchemy.pool import QueuePool +from metadata.clients.atlas_client import AtlasClient from metadata.clients.connection_clients import ( AirByteClient, AmundsenClient, @@ -127,6 +128,9 @@ from metadata.generated.schema.entity.services.connections.messaging.redpandaCon from metadata.generated.schema.entity.services.connections.metadata.amundsenConnection import ( AmundsenConnection, ) +from metadata.generated.schema.entity.services.connections.metadata.atlasConnection import ( + AtlasConnection, +) from metadata.generated.schema.entity.services.connections.mlmodel.mlflowConnection import ( MlflowConnection, ) @@ -1270,3 +1274,19 @@ def _(connection: AmundsenClient) -> None: except Exception as exc: msg = f"Unknown error connecting with {connection}: {exc}." raise SourceConnectionException(msg) + + +@get_connection.register +def _(connection: AtlasConnection) -> AtlasClient: + + connection_client = AtlasClient(connection) + return connection_client + + +@test_connection.register +def _(connection: AtlasClient) -> None: + try: + connection.list_entities() + except Exception as exc: + msg = f"Unknown error connecting with {connection}: {exc}." + raise SourceConnectionException(msg) diff --git a/ingestion/src/metadata/utils/amundsen_helper.py b/ingestion/src/metadata/utils/metadata_service_helper.py similarity index 90% rename from ingestion/src/metadata/utils/amundsen_helper.py rename to ingestion/src/metadata/utils/metadata_service_helper.py index a72b4b875ac..e661e289c80 100644 --- a/ingestion/src/metadata/utils/amundsen_helper.py +++ b/ingestion/src/metadata/utils/metadata_service_helper.py @@ -37,6 +37,15 @@ SERVICE_TYPE_MAPPER = { } }, }, + "kafka": { + "service_name": "Kafka", + "connection": { + "config": { + "bootstrapServers": "localhost:9092", + "schemaRegistryURL": "http://localhost:8081", + } + }, + }, "bigquery": { "service_name": "BigQuery", "connection": {"config": {"credentials": "credentials"}}, diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/metadata/atlasConnection.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/metadata/atlasConnection.json index 803c3619642..c58d7d4dbb2 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/metadata/atlasConnection.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/metadata/atlasConnection.json @@ -29,36 +29,12 @@ "format": "password" }, "hostPort": { - "description": "Host and port of the data source.", + "description": "Host and port of the Atlas service.", "title": "Host and Port", "type": "string", "format": "uri", "expose": true }, - "entityTypes": { - "description": "entity types of the data source.", - "type": "string" - }, - "serviceType": { - "description": "service type of the data source.", - "type": "string" - }, - "atlasHost": { - "description": "Atlas Host of the data source.", - "type": "string" - }, - "dbService": { - "description": "source database of the data source.", - "type": "string" - }, - "messagingService": { - "description": "messaging service source of the data source.", - "type": "string" - }, - "database": { - "description": "Database of the data source. This is optional parameter, if you would like to restrict the metadata reading to a single database. When left blank , OpenMetadata Ingestion attempts to scan all the databases in Atlas.", - "type": "string" - }, "connectionOptions": { "$ref": "../connectionBasicType.json#/definitions/connectionOptions" },