Fix: atlas test connection (#9791)

* Fix: atlas test connection

* Fix: docs changes

* Fix: docs changes

* Fix: python checkstyle

* Fix: python test
This commit is contained in:
NiharDoshi99 2023-01-18 20:16:07 +05:30 committed by GitHub
parent 1d874f49d0
commit f2649041f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 33 additions and 16 deletions

View File

@ -41,8 +41,8 @@ class AtlasClient:
self.client = REST(client_config) self.client = REST(client_config)
self._use_raw_data = raw_data self._use_raw_data = raw_data
def list_entities(self, entity_type="Table") -> List[str]: def list_entities(self) -> List[str]:
response = self.client.get(f"/atlas/entities?type={entity_type}") response = self.client.get(f"/atlas/entities?type={self.config.entity_type}")
if "error" in response.keys(): if "error" in response.keys():
raise APIError(response["error"]) raise APIError(response["error"])

View File

@ -63,10 +63,6 @@ logger = ingestion_logger()
ATLAS_TAG_CATEGORY = "AtlasMetadata" ATLAS_TAG_CATEGORY = "AtlasMetadata"
ATLAS_TABLE_TAG = "atlas_table" ATLAS_TABLE_TAG = "atlas_table"
ENTITY_TYPES = {
"Table": {"Table": {"db": "db", "column": "columns"}},
"Topic": {"Topic": {"schema": "schema"}},
}
class AtlasSourceStatus(SourceStatus): class AtlasSourceStatus(SourceStatus):
@ -109,6 +105,12 @@ class AtlasSource(Source):
self.service = None self.service = None
self.message_service = None self.message_service = None
self.entity_types = {
"Table": {
self.service_connection.entity_type: {"db": "db", "column": "columns"}
},
"Topic": {"Topic": {"schema": "schema"}},
}
@classmethod @classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection): def create(cls, config_dict, metadata_config: OpenMetadataConnection):
@ -132,9 +134,9 @@ class AtlasSource(Source):
entity=DatabaseService, fqn=service entity=DatabaseService, fqn=service
) )
if check_service: if check_service:
for key in ENTITY_TYPES["Table"]: for key in self.entity_types["Table"]:
self.service = check_service self.service = check_service
self.tables[key] = self.atlas_client.list_entities(entity_type=key) self.tables[key] = self.atlas_client.list_entities()
if self.tables.get(key, None): if self.tables.get(key, None):
for key in self.tables: for key in self.tables:
yield from self._parse_table_entity(key, self.tables[key]) yield from self._parse_table_entity(key, self.tables[key])
@ -148,9 +150,9 @@ class AtlasSource(Source):
entity=MessagingService, fqn=service entity=MessagingService, fqn=service
) )
if check_service: if check_service:
for key in ENTITY_TYPES["Topic"]: for key in self.entity_types["Topic"]:
self.message_service = check_service self.message_service = check_service
self.topics[key] = self.atlas_client.list_entities(entity_type=key) self.topics[key] = self.atlas_client.list_entities()
if self.topics.get(key, None): if self.topics.get(key, None):
for topic in self.topics: for topic in self.topics:
yield from self._parse_topic_entity(topic) yield from self._parse_topic_entity(topic)
@ -213,7 +215,7 @@ class AtlasSource(Source):
tbl_attrs = tbl_entity["attributes"] tbl_attrs = tbl_entity["attributes"]
db_entity = tbl_entity["relationshipAttributes"][ db_entity = tbl_entity["relationshipAttributes"][
ENTITY_TYPES["Table"][name]["db"] self.entity_types["Table"][name]["db"]
] ]
database_fqn = fqn.build( database_fqn = fqn.build(
@ -328,7 +330,7 @@ class AtlasSource(Source):
def _parse_table_columns(self, table_response, tbl_entity, name) -> List[Column]: def _parse_table_columns(self, table_response, tbl_entity, name) -> List[Column]:
om_cols = [] om_cols = []
col_entities = tbl_entity["relationshipAttributes"][ col_entities = tbl_entity["relationshipAttributes"][
ENTITY_TYPES["Table"][name]["column"] self.entity_types["Table"][name]["column"]
] ]
referred_entities = table_response["referredEntities"] referred_entities = table_response["referredEntities"]
ordinal_pos = 1 ordinal_pos = 1
@ -370,11 +372,11 @@ class AtlasSource(Source):
tbl_entity = self.atlas_client.get_entity(lineage_response["baseEntityGuid"]) tbl_entity = self.atlas_client.get_entity(lineage_response["baseEntityGuid"])
for key in tbl_entity["referredEntities"].keys(): for key in tbl_entity["referredEntities"].keys():
if not tbl_entity["entities"][0]["relationshipAttributes"].get( if not tbl_entity["entities"][0]["relationshipAttributes"].get(
ENTITY_TYPES["Table"][name]["db"] self.entity_types["Table"][name]["db"]
): ):
continue continue
db_entity = tbl_entity["entities"][0]["relationshipAttributes"][ db_entity = tbl_entity["entities"][0]["relationshipAttributes"][
ENTITY_TYPES["Table"][name]["db"] self.entity_types["Table"][name]["db"]
] ]
if not tbl_entity["referredEntities"].get(key): if not tbl_entity["referredEntities"].get(key):
continue continue
@ -402,7 +404,7 @@ class AtlasSource(Source):
tbl_entity = self.atlas_client.get_entity(edge["toEntityId"]) tbl_entity = self.atlas_client.get_entity(edge["toEntityId"])
for key in tbl_entity["referredEntities"]: for key in tbl_entity["referredEntities"]:
db_entity = tbl_entity["entities"][0]["relationshipAttributes"][ db_entity = tbl_entity["entities"][0]["relationshipAttributes"][
ENTITY_TYPES["Table"][name]["db"] self.entity_types["Table"][name]["db"]
] ]
db = self.get_database_entity(db_entity["displayText"]) db = self.get_database_entity(db_entity["displayText"])

View File

@ -65,6 +65,7 @@ mock_atlas_config = {
"password": "password", "password": "password",
"databaseServiceName": ["hive"], "databaseServiceName": ["hive"],
"messagingServiceName": [], "messagingServiceName": [],
"entity_type": "NotTable",
} }
}, },
"sourceConfig": {"config": {"type": "DatabaseMetadata"}}, "sourceConfig": {"config": {"type": "DatabaseMetadata"}},
@ -106,7 +107,7 @@ def mock_get_entity(self, table): # pylint: disable=unused-argument
return mock_data return mock_data
def mock_list_entities(self, entity_type): # pylint: disable=unused-argument def mock_list_entities(self): # pylint: disable=unused-argument
return LIST_ENTITIES return LIST_ENTITIES

View File

@ -60,6 +60,7 @@ source:
password: password password: password
databaseServiceName: ["local_hive"] # create database service and messaging service and pass `service name` here databaseServiceName: ["local_hive"] # create database service and messaging service and pass `service name` here
messagingServiceName: [] messagingServiceName: []
entity_type: Table # this entity must be present on atlas
sourceConfig: sourceConfig:
config: config:
type: DatabaseMetadata type: DatabaseMetadata
@ -79,6 +80,7 @@ workflowConfig:
- **Password**: password to connect to the Atlas. - **Password**: password to connect to the Atlas.
- **databaseServiceName**: source database of the data source(Database service that you created from UI. example- local_hive) - **databaseServiceName**: source database of the data source(Database service that you created from UI. example- local_hive)
- **messagingServiceName**: messaging service source of the data source. - **messagingServiceName**: messaging service source of the data source.
- **entity_type**: Name of the entity type in Atlas.
#### Sink Configuration #### Sink Configuration

View File

@ -61,6 +61,7 @@ source:
password: password password: password
databaseServiceName: ["local_hive"] # pass database service here databaseServiceName: ["local_hive"] # pass database service here
messagingServiceName: [] # pass messaging service here messagingServiceName: [] # pass messaging service here
entity_type: Table # this entity must be present on atlas
sourceConfig: sourceConfig:
config: config:
type: DatabaseMetadata type: DatabaseMetadata
@ -81,6 +82,7 @@ workflowConfig:
- **Password**: password to connect to the Atlas. - **Password**: password to connect to the Atlas.
- **databaseServiceName**: source database of the data source(Database service that you created from UI. example- local_hive) - **databaseServiceName**: source database of the data source(Database service that you created from UI. example- local_hive)
- **messagingServiceName**: messaging service source of the data source. - **messagingServiceName**: messaging service source of the data source.
- **entity_type**: Name of the entity type in Atlas.
#### Sink Configuration #### Sink Configuration

View File

@ -79,6 +79,7 @@ Pass the `service name` in your config like given below
password: password password: password
databaseServiceName: ["local_hive"] # pass database service here databaseServiceName: ["local_hive"] # pass database service here
messagingServiceName: [] # pass messaging service here messagingServiceName: [] # pass messaging service here
entity_type: Table # this entity must be present on atlas
``` ```
## Metadata Ingestion ## Metadata Ingestion
@ -180,6 +181,7 @@ the changes.
- **Password**: password to connect to the Atlas. - **Password**: password to connect to the Atlas.
- **databaseServiceName**: source database of the data source(Database service that you created from UI. example- local_hive) - **databaseServiceName**: source database of the data source(Database service that you created from UI. example- local_hive)
- **messagingServiceName**: messaging service source of the data source. - **messagingServiceName**: messaging service source of the data source.
- **Entity_Type**: Name of the entity type in Atlas.
### 6. Schedule the Ingestion and Deploy ### 6. Schedule the Ingestion and Deploy
@ -245,6 +247,7 @@ source:
password: password password: password
databaseServiceName: ["local_hive"] # create database service and messaging service and pass `service name` here databaseServiceName: ["local_hive"] # create database service and messaging service and pass `service name` here
messagingServiceName: [] messagingServiceName: []
entity_type: Table
sourceConfig: sourceConfig:
config: config:
type: DatabaseMetadata type: DatabaseMetadata
@ -266,6 +269,7 @@ You can find all the definitions and types for the `serviceConnection` [here](ht
- `hostPort`: Atlas Host of the data source. - `hostPort`: Atlas Host of the data source.
- `databaseServiceName`: source database of the data source(Database service that you created from UI. example- local_hive). - `databaseServiceName`: source database of the data source(Database service that you created from UI. example- local_hive).
- `messagingServiceName`: messaging service source of the data source. - `messagingServiceName`: messaging service source of the data source.
- `entity_type`: Name of the entity type in Atlas.
### Sink Configuration ### Sink Configuration

Binary file not shown.

Before

Width:  |  Height:  |  Size: 547 KiB

After

Width:  |  Height:  |  Size: 311 KiB

View File

@ -49,6 +49,11 @@
"type": "string" "type": "string"
} }
}, },
"entity_type": {
"title": "Entity Type",
"description": "Name of the Entity Type availabe in Atlas",
"type": "string"
},
"connectionOptions": { "connectionOptions": {
"$ref": "../connectionBasicType.json#/definitions/connectionOptions" "$ref": "../connectionBasicType.json#/definitions/connectionOptions"
}, },
@ -59,5 +64,6 @@
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction" "$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
} }
}, },
"required": ["entity_type", "username", "password"],
"additionalProperties": false "additionalProperties": false
} }