Fix: atlas connector (#9034)

This commit is contained in:
NiharDoshi99 2022-11-29 23:55:05 +05:30 committed by GitHub
parent 31d711defd
commit 34aacb20d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 124 additions and 72 deletions

1
.gitignore vendored
View File

@ -26,6 +26,7 @@ catalog-services/catalog-services.iml
# local docker volume
docker/local-metadata/docker-volume
docker-volume
# Java template
*.class

View File

@ -28,12 +28,11 @@ class AtlasClient:
def __init__(self, config: AtlasConnection, raw_data: bool = False):
self.config = config
config_obj = self.config.serviceConnection.__root__.config
self.auth_token = generate_http_basic_token(
config_obj.username, config_obj.password.get_secret_value()
config.username, config.password.get_secret_value()
)
client_config: ClientConfig = ClientConfig(
base_url=config_obj.atlasHost,
base_url=config.hostPort,
auth_header="Authorization",
api_version="api",
auth_token=self.get_auth_token,

View File

@ -1,7 +0,0 @@
Table:
rdbms_table:
db: rdbms_db
column: rdbms_column
Topic:
- kafka_topic
- kafka_topic_2

View File

@ -66,10 +66,10 @@ from metadata.ingestion.ometa.client_utils import get_chart_entities_from_id
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser
from metadata.utils import fqn
from metadata.utils.amundsen_helper import SERVICE_TYPE_MAPPER
from metadata.utils.connections import get_connection
from metadata.utils.helpers import get_standard_chart_type
from metadata.utils.logger import ingestion_logger
from metadata.utils.metadata_service_helper import SERVICE_TYPE_MAPPER
from metadata.utils.sql_queries import (
NEO4J_AMUNDSEN_DASHBOARD_QUERY,
NEO4J_AMUNDSEN_TABLE_QUERY,

View File

@ -15,11 +15,8 @@ Atlas source to extract metadata
import traceback
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Iterable, List
import yaml
from metadata.clients.atlas_client import AtlasClient
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.api.data.createDatabaseSchema import (
@ -28,6 +25,16 @@ from metadata.generated.schema.api.data.createDatabaseSchema import (
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.data.createTopic import CreateTopicRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.api.services.createDatabaseService import (
CreateDatabaseServiceRequest,
)
from metadata.generated.schema.api.services.createMessagingService import (
CreateMessagingServiceRequest,
)
from metadata.generated.schema.api.tags.createTag import CreateTagRequest
from metadata.generated.schema.api.tags.createTagCategory import (
CreateTagCategoryRequest,
)
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.pipeline import Pipeline
@ -38,21 +45,28 @@ from metadata.generated.schema.entity.services.connections.metadata.atlasConnect
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.services.messagingService import MessagingService
from metadata.generated.schema.entity.tags.tagCategory import Tag
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.tagLabel import TagLabel
from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
from metadata.ingestion.models.ometa_tag_category import OMetaTagAndCategory
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser
from metadata.utils import fqn
from metadata.utils.connections import get_connection
from metadata.utils.logger import ingestion_logger
from metadata.utils.metadata_service_helper import SERVICE_TYPE_MAPPER
logger = ingestion_logger()
ATLAS_TAG_CATEGORY = "AtlasMetadata"
ATLAS_TABLE_TAG = "atlas_table"
ENTITY_TYPES = {"Table": {"Table": {"db": "db", "column": "columns"}}}
class AtlasSourceStatus(SourceStatus):
tables_scanned: List[str] = []
@ -88,20 +102,7 @@ class AtlasSource(Source):
self.service_connection = self.config.serviceConnection.__root__.config
self.status = AtlasSourceStatus()
self.schema_registry_url = "http://localhost:8081"
self.bootstrap_servers = "http://localhost:9092"
self.atlas_client = AtlasClient(config)
path = Path(self.service_connection.entityTypes)
if not path.is_file():
logger.error(f"File not found {self.service_connection.entityTypes}")
raise FileNotFoundError()
with open(
self.service_connection.entityTypes, "r", encoding="utf-8"
) as entity_types_file:
self.service_connection.entityTypes = yaml.load(
entity_types_file, Loader=yaml.SafeLoader
)
self.atlas_client = get_connection(self.service_connection)
self.tables: Dict[str, Any] = {}
self.topics: Dict[str, Any] = {}
@ -125,16 +126,12 @@ class AtlasSource(Source):
"""
def next_record(self):
for key in self.service_connection.entityTypes["Table"].keys():
self.service = self.metadata.get_by_name(
entity=DatabaseService, fqn=self.service_connection.dbService
)
for key in ENTITY_TYPES["Table"]:
self.service = self.get_database_service()
self.tables[key] = self.atlas_client.list_entities(entity_type=key)
for key in self.service_connection.entityTypes.get("Topic", []):
self.message_service = self.metadata.get_by_name(
entity=MessagingService, fqn=self.service_connection.messagingService
)
for key in ENTITY_TYPES.get("Topic", []):
self.message_service = self.get_message_service()
self.topics[key] = self.atlas_client.list_entities(entity_type=key)
if self.tables:
@ -188,17 +185,15 @@ class AtlasSource(Source):
)
tbl_attrs = tbl_entity["attributes"]
db_entity = tbl_entity["relationshipAttributes"][
self.service_connection.entityTypes["Table"][name]["db"]
ENTITY_TYPES["Table"][name]["db"]
]
yield self.get_database_entity(db_entity["displayText"])
database_fqn = fqn.build(
self.metadata,
entity_type=Database,
service_name=self.service_connection.dbService,
service_name=self.service.name.__root__,
database_name=db_entity["displayText"],
)
database_object = self.metadata.get_by_name(
entity=Database, fqn=database_fqn
)
@ -213,7 +208,7 @@ class AtlasSource(Source):
database_schema_fqn = fqn.build(
self.metadata,
entity_type=DatabaseSchema,
service_name=self.config.serviceConnection.__root__.config.dbService,
service_name=self.service.name.__root__,
database_name=db_entity["displayText"],
schema_name=db_entity["displayText"],
)
@ -221,6 +216,8 @@ class AtlasSource(Source):
entity=DatabaseSchema, fqn=database_schema_fqn
)
yield self.create_tag()
yield CreateTableRequest(
name=tbl_attrs["name"],
databaseSchema=EntityReference(
@ -228,6 +225,7 @@ class AtlasSource(Source):
),
description=tbl_attrs["description"],
columns=tbl_columns,
tags=self.get_tags(),
)
yield from self.ingest_lineage(tbl_entity["guid"], name)
@ -236,10 +234,38 @@ class AtlasSource(Source):
logger.debug(traceback.format_exc())
logger.warning(f"Failed to parse {table_entity}: {exc}")
def get_tags(self):
tags = [
TagLabel(
tagFQN=fqn.build(
self.metadata,
Tag,
tag_category_name=ATLAS_TAG_CATEGORY,
tag_name=ATLAS_TABLE_TAG,
),
labelType="Automated",
state="Suggested",
source="Tag",
)
]
return tags
def create_tag(self) -> OMetaTagAndCategory:
atlas_table_tag = OMetaTagAndCategory(
category_name=CreateTagCategoryRequest(
name=ATLAS_TAG_CATEGORY,
description="Tags associates with atlas entities",
),
category_details=CreateTagRequest(
name=ATLAS_TABLE_TAG, description="Atlas Cluster Tag"
),
)
return atlas_table_tag
def _parse_table_columns(self, table_response, tbl_entity, name) -> List[Column]:
om_cols = []
col_entities = tbl_entity["relationshipAttributes"][
self.service_connection.entityTypes["Table"][name]["column"]
ENTITY_TYPES["Table"][name]["column"]
]
referred_entities = table_response["referredEntities"]
ordinal_pos = 1
@ -281,11 +307,11 @@ class AtlasSource(Source):
tbl_entity = self.atlas_client.get_entity(lineage_response["baseEntityGuid"])
for key in tbl_entity["referredEntities"].keys():
if not tbl_entity["entities"][0]["relationshipAttributes"].get(
self.service_connection.entityTypes["Table"][name]["db"]
ENTITY_TYPES["Table"][name]["db"]
):
continue
db_entity = tbl_entity["entities"][0]["relationshipAttributes"][
self.service_connection.entityTypes["Table"][name]["db"]
ENTITY_TYPES["Table"][name]["db"]
]
if not tbl_entity["referredEntities"].get(key):
continue
@ -313,7 +339,7 @@ class AtlasSource(Source):
tbl_entity = self.atlas_client.get_entity(edge["toEntityId"])
for key in tbl_entity["referredEntities"]:
db_entity = tbl_entity["entities"][0]["relationshipAttributes"][
self.service_connection.entityTypes["Table"][name]["db"]
ENTITY_TYPES["Table"][name]["db"]
]
db = self.get_database_entity(db_entity["displayText"])
@ -333,6 +359,34 @@ class AtlasSource(Source):
)
yield from self.yield_lineage(from_entity_ref, to_entity_ref)
def get_database_service(self):
service = self.metadata.create_or_update(
CreateDatabaseServiceRequest(
name=SERVICE_TYPE_MAPPER.get("hive")["service_name"],
displayName="hive",
serviceType=SERVICE_TYPE_MAPPER.get("hive")["service_name"],
connection=SERVICE_TYPE_MAPPER["hive"]["connection"],
)
)
if service is not None:
return service
logger.error("Failed to create a service with name detlaLake")
return None
def get_message_service(self):
service = self.metadata.create_or_update(
CreateMessagingServiceRequest(
name=SERVICE_TYPE_MAPPER.get("kafka")["service_name"],
displayName=SERVICE_TYPE_MAPPER.get("kafka")["service_name"],
serviceType=SERVICE_TYPE_MAPPER.get("kafka")["service_name"],
connection=SERVICE_TYPE_MAPPER.get("kafka")["connection"],
)
)
if service is not None:
return service
logger.error("Failed to create a service with name kafka")
return None
def yield_lineage(self, from_entity_ref, to_entity_ref):
if from_entity_ref and to_entity_ref and from_entity_ref != to_entity_ref:
lineage = AddLineageRequest(

View File

@ -29,6 +29,7 @@ from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.orm.session import Session
from sqlalchemy.pool import QueuePool
from metadata.clients.atlas_client import AtlasClient
from metadata.clients.connection_clients import (
AirByteClient,
AmundsenClient,
@ -127,6 +128,9 @@ from metadata.generated.schema.entity.services.connections.messaging.redpandaCon
from metadata.generated.schema.entity.services.connections.metadata.amundsenConnection import (
AmundsenConnection,
)
from metadata.generated.schema.entity.services.connections.metadata.atlasConnection import (
AtlasConnection,
)
from metadata.generated.schema.entity.services.connections.mlmodel.mlflowConnection import (
MlflowConnection,
)
@ -1270,3 +1274,19 @@ def _(connection: AmundsenClient) -> None:
except Exception as exc:
msg = f"Unknown error connecting with {connection}: {exc}."
raise SourceConnectionException(msg)
@get_connection.register
def _(connection: AtlasConnection) -> AtlasClient:
connection_client = AtlasClient(connection)
return connection_client
@test_connection.register
def _(connection: AtlasClient) -> None:
try:
connection.list_entities()
except Exception as exc:
msg = f"Unknown error connecting with {connection}: {exc}."
raise SourceConnectionException(msg)

View File

@ -37,6 +37,15 @@ SERVICE_TYPE_MAPPER = {
}
},
},
"kafka": {
"service_name": "Kafka",
"connection": {
"config": {
"bootstrapServers": "localhost:9092",
"schemaRegistryURL": "http://localhost:8081",
}
},
},
"bigquery": {
"service_name": "BigQuery",
"connection": {"config": {"credentials": "credentials"}},

View File

@ -29,36 +29,12 @@
"format": "password"
},
"hostPort": {
"description": "Host and port of the data source.",
"description": "Host and port of the Atlas service.",
"title": "Host and Port",
"type": "string",
"format": "uri",
"expose": true
},
"entityTypes": {
"description": "entity types of the data source.",
"type": "string"
},
"serviceType": {
"description": "service type of the data source.",
"type": "string"
},
"atlasHost": {
"description": "Atlas Host of the data source.",
"type": "string"
},
"dbService": {
"description": "source database of the data source.",
"type": "string"
},
"messagingService": {
"description": "messaging service source of the data source.",
"type": "string"
},
"database": {
"description": "Database of the data source. This is optional parameter, if you would like to restrict the metadata reading to a single database. When left blank , OpenMetadata Ingestion attempts to scan all the databases in Atlas.",
"type": "string"
},
"connectionOptions": {
"$ref": "../connectionBasicType.json#/definitions/connectionOptions"
},