diff --git a/ingestion/src/metadata/ingestion/source/metadata/atlas/client.py b/ingestion/src/metadata/ingestion/source/metadata/atlas/client.py index 379936f1ae7..e7f3189caf6 100644 --- a/ingestion/src/metadata/ingestion/source/metadata/atlas/client.py +++ b/ingestion/src/metadata/ingestion/source/metadata/atlas/client.py @@ -41,8 +41,8 @@ class AtlasClient: self.client = REST(client_config) self._use_raw_data = raw_data - def list_entities(self, entity_type="Table") -> List[str]: - response = self.client.get(f"/atlas/entities?type={entity_type}") + def list_entities(self) -> List[str]: + response = self.client.get(f"/atlas/entities?type={self.config.entity_type}") if "error" in response.keys(): raise APIError(response["error"]) diff --git a/ingestion/src/metadata/ingestion/source/metadata/atlas/metadata.py b/ingestion/src/metadata/ingestion/source/metadata/atlas/metadata.py index ef5b992bd60..18fac827f44 100644 --- a/ingestion/src/metadata/ingestion/source/metadata/atlas/metadata.py +++ b/ingestion/src/metadata/ingestion/source/metadata/atlas/metadata.py @@ -63,10 +63,6 @@ logger = ingestion_logger() ATLAS_TAG_CATEGORY = "AtlasMetadata" ATLAS_TABLE_TAG = "atlas_table" -ENTITY_TYPES = { - "Table": {"Table": {"db": "db", "column": "columns"}}, - "Topic": {"Topic": {"schema": "schema"}}, -} class AtlasSourceStatus(SourceStatus): @@ -109,6 +105,12 @@ class AtlasSource(Source): self.service = None self.message_service = None + self.entity_types = { + "Table": { + self.service_connection.entity_type: {"db": "db", "column": "columns"} + }, + "Topic": {"Topic": {"schema": "schema"}}, + } @classmethod def create(cls, config_dict, metadata_config: OpenMetadataConnection): @@ -132,9 +134,9 @@ class AtlasSource(Source): entity=DatabaseService, fqn=service ) if check_service: - for key in ENTITY_TYPES["Table"]: + for key in self.entity_types["Table"]: self.service = check_service - self.tables[key] = self.atlas_client.list_entities(entity_type=key) + self.tables[key] = self.atlas_client.list_entities() if self.tables.get(key, None): for key in self.tables: yield from self._parse_table_entity(key, self.tables[key]) @@ -148,9 +150,9 @@ class AtlasSource(Source): entity=MessagingService, fqn=service ) if check_service: - for key in ENTITY_TYPES["Topic"]: + for key in self.entity_types["Topic"]: self.message_service = check_service - self.topics[key] = self.atlas_client.list_entities(entity_type=key) + self.topics[key] = self.atlas_client.list_entities() if self.topics.get(key, None): for topic in self.topics: yield from self._parse_topic_entity(topic) @@ -213,7 +215,7 @@ class AtlasSource(Source): tbl_attrs = tbl_entity["attributes"] db_entity = tbl_entity["relationshipAttributes"][ - ENTITY_TYPES["Table"][name]["db"] + self.entity_types["Table"][name]["db"] ] database_fqn = fqn.build( @@ -328,7 +330,7 @@ class AtlasSource(Source): def _parse_table_columns(self, table_response, tbl_entity, name) -> List[Column]: om_cols = [] col_entities = tbl_entity["relationshipAttributes"][ - ENTITY_TYPES["Table"][name]["column"] + self.entity_types["Table"][name]["column"] ] referred_entities = table_response["referredEntities"] ordinal_pos = 1 @@ -370,11 +372,11 @@ class AtlasSource(Source): tbl_entity = self.atlas_client.get_entity(lineage_response["baseEntityGuid"]) for key in tbl_entity["referredEntities"].keys(): if not tbl_entity["entities"][0]["relationshipAttributes"].get( - ENTITY_TYPES["Table"][name]["db"] + self.entity_types["Table"][name]["db"] ): continue db_entity = tbl_entity["entities"][0]["relationshipAttributes"][ - ENTITY_TYPES["Table"][name]["db"] + self.entity_types["Table"][name]["db"] ] if not tbl_entity["referredEntities"].get(key): continue @@ -402,7 +404,7 @@ class AtlasSource(Source): tbl_entity = self.atlas_client.get_entity(edge["toEntityId"]) for key in tbl_entity["referredEntities"]: db_entity = tbl_entity["entities"][0]["relationshipAttributes"][ - ENTITY_TYPES["Table"][name]["db"] + self.entity_types["Table"][name]["db"] ] db = self.get_database_entity(db_entity["displayText"]) diff --git a/ingestion/tests/unit/topology/metadata/test_atlas.py b/ingestion/tests/unit/topology/metadata/test_atlas.py index 842f27c921b..0a6b1bd58d0 100644 --- a/ingestion/tests/unit/topology/metadata/test_atlas.py +++ b/ingestion/tests/unit/topology/metadata/test_atlas.py @@ -65,6 +65,7 @@ mock_atlas_config = { "password": "password", "databaseServiceName": ["hive"], "messagingServiceName": [], + "entity_type": "NotTable", } }, "sourceConfig": {"config": {"type": "DatabaseMetadata"}}, @@ -106,7 +107,7 @@ def mock_get_entity(self, table): # pylint: disable=unused-argument return mock_data -def mock_list_entities(self, entity_type): # pylint: disable=unused-argument +def mock_list_entities(self): # pylint: disable=unused-argument return LIST_ENTITIES diff --git a/openmetadata-docs/content/connectors/metadata/atlas/airflow.md b/openmetadata-docs/content/connectors/metadata/atlas/airflow.md index d3b87bca6a8..26de0de99e1 100644 --- a/openmetadata-docs/content/connectors/metadata/atlas/airflow.md +++ b/openmetadata-docs/content/connectors/metadata/atlas/airflow.md @@ -60,6 +60,7 @@ source: password: password databaseServiceName: ["local_hive"] # create database service and messaging service and pass `service name` here messagingServiceName: [] + entity_type: Table # this entity must be present on atlas sourceConfig: config: type: DatabaseMetadata @@ -79,6 +80,7 @@ workflowConfig: - **Password**: password to connect to the Atlas. - **databaseServiceName**: source database of the data source(Database service that you created from UI. example- local_hive) - **messagingServiceName**: messaging service source of the data source. +- **entity_type**: Name of the entity type in Atlas. #### Sink Configuration diff --git a/openmetadata-docs/content/connectors/metadata/atlas/cli.md b/openmetadata-docs/content/connectors/metadata/atlas/cli.md index c7781ebfb60..9c79a3a5eae 100644 --- a/openmetadata-docs/content/connectors/metadata/atlas/cli.md +++ b/openmetadata-docs/content/connectors/metadata/atlas/cli.md @@ -61,6 +61,7 @@ source: password: password databaseServiceName: ["local_hive"] # pass database service here messagingServiceName: [] # pass messaging service here + entity_type: Table # this entity must be present on atlas sourceConfig: config: type: DatabaseMetadata @@ -81,6 +82,7 @@ workflowConfig: - **Password**: password to connect to the Atlas. - **databaseServiceName**: source database of the data source(Database service that you created from UI. example- local_hive) - **messagingServiceName**: messaging service source of the data source. +- **entity_type**: Name of the entity type in Atlas. #### Sink Configuration diff --git a/openmetadata-docs/content/connectors/metadata/atlas/index.md b/openmetadata-docs/content/connectors/metadata/atlas/index.md index e77e1494c1a..7b0dacc9ed2 100644 --- a/openmetadata-docs/content/connectors/metadata/atlas/index.md +++ b/openmetadata-docs/content/connectors/metadata/atlas/index.md @@ -79,6 +79,7 @@ Pass the `service name` in your config like given below password: password databaseServiceName: ["local_hive"] # pass database service here messagingServiceName: [] # pass messaging service here + entity_type: Table # this entity must be present on atlas ``` ## Metadata Ingestion @@ -180,6 +181,7 @@ the changes. - **Password**: password to connect to the Atlas. - **databaseServiceName**: source database of the data source(Database service that you created from UI. example- local_hive) - **messagingServiceName**: messaging service source of the data source. +- **Entity_Type**: Name of the entity type in Atlas. ### 6. Schedule the Ingestion and Deploy @@ -245,6 +247,7 @@ source: password: password databaseServiceName: ["local_hive"] # create database service and messaging service and pass `service name` here messagingServiceName: [] + entity_type: Table sourceConfig: config: type: DatabaseMetadata @@ -266,6 +269,7 @@ You can find all the definitions and types for the `serviceConnection` [here](ht - `hostPort`: Atlas Host of the data source. - `databaseServiceName`: source database of the data source(Database service that you created from UI. example- local_hive). - `messagingServiceName`: messaging service source of the data source. +- `entity_type`: Name of the entity type in Atlas. ### Sink Configuration diff --git a/openmetadata-docs/images/openmetadata/connectors/atlas/connection-options.png b/openmetadata-docs/images/openmetadata/connectors/atlas/connection-options.png index da7f83e7472..a79fdcff082 100644 Binary files a/openmetadata-docs/images/openmetadata/connectors/atlas/connection-options.png and b/openmetadata-docs/images/openmetadata/connectors/atlas/connection-options.png differ diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/metadata/atlasConnection.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/metadata/atlasConnection.json index ddbf481e313..1db28305b50 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/metadata/atlasConnection.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/metadata/atlasConnection.json @@ -49,6 +49,11 @@ "type": "string" } }, + "entity_type": { + "title": "Entity Type", + "description": "Name of the Entity Type availabe in Atlas", + "type": "string" + }, "connectionOptions": { "$ref": "../connectionBasicType.json#/definitions/connectionOptions" }, @@ -59,5 +64,6 @@ "$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction" } }, + "required": ["entity_type", "username", "password"], "additionalProperties": false }